amrishlal commented on a change in pull request #8029:
URL: https://github.com/apache/pinot/pull/8029#discussion_r815203052



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PreAggregationGapFillDataTableReducer.java
##########
@@ -0,0 +1,775 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.GapfillUtils;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+/**
+ * Helper class to reduce and set Aggregation results into the 
BrokerResponseNative
+ */

Review comment:
       /** Helper class to reduce and set gap fill results into the 
BrokerResponseNative */

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PreAggregationGapFillDataTableReducer.java
##########
@@ -0,0 +1,775 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.GapfillUtils;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+/**
+ * Helper class to reduce and set Aggregation results into the 
BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class PreAggregationGapFillDataTableReducer implements DataTableReducer 
{
+  private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, 
find a better value.
+
+  private final QueryContext _queryContext;
+
+  private final int _limitForAggregatedResult;
+  private int _limitForGapfilledResult;
+
+  private final DateTimeGranularitySpec _dateTimeGranularity;
+  private final DateTimeFormatSpec _dateTimeFormatter;
+  private final long _startMs;
+  private final long _endMs;
+  private final long _timeBucketSize;
+
+  private final List<Integer> _groupByKeyIndexes;
+  private boolean [] _isGroupBySelections;
+  private final Set<Key> _groupByKeys;
+  private final Map<Key, Object[]> _previousByGroupKey;
+  private final Map<String, ExpressionContext> _fillExpressions;
+  private final List<ExpressionContext> _timeSeries;
+  private final GapfillUtils.GapfillType _gapfillType;
+
+  PreAggregationGapFillDataTableReducer(QueryContext queryContext) {
+    _queryContext = queryContext;
+    _gapfillType = queryContext.getGapfillType();
+    _limitForAggregatedResult = queryContext.getLimit();
+    if (_gapfillType == GapfillUtils.GapfillType.AGGREGATE_GAP_FILL
+        || _gapfillType == GapfillUtils.GapfillType.GAP_FILL) {
+      _limitForGapfilledResult = queryContext.getLimit();
+    } else {
+      _limitForGapfilledResult = queryContext.getSubQueryContext().getLimit();
+    }
+
+    ExpressionContext gapFillSelection = 
GapfillUtils.getGapfillExpressionContext(queryContext);
+
+    Preconditions.checkArgument(
+        gapFillSelection != null && gapFillSelection.getFunction() != null, 
"Gapfill Expression should be function.");
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    Preconditions.checkArgument(
+        args.size() > 5, "PreAggregateGapFill does not have correct number of 
arguments.");
+    Preconditions.checkArgument(
+        args.get(1).getLiteral() != null, "The second argument of 
PostAggregateGapFill should be TimeFormatter.");
+    Preconditions.checkArgument(
+        args.get(2).getLiteral() != null, "The third argument of 
PostAggregateGapFill should be start time.");
+    Preconditions.checkArgument(
+        args.get(3).getLiteral() != null, "The fourth argument of 
PostAggregateGapFill should be end time.");
+    Preconditions.checkArgument(
+        args.get(4).getLiteral() != null, "The fifth argument of 
PostAggregateGapFill should be time bucket size.");

Review comment:
       I am wondering if these checks are needed because by the time we get to 
this part, user query would already have been validated at parsing compilation 
time? If still needed, can these checks be moved to parsing / compilation time?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PreAggregationGapFillDataTableReducer.java
##########
@@ -0,0 +1,775 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.GapfillUtils;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+/**
+ * Helper class to reduce and set Aggregation results into the 
BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class PreAggregationGapFillDataTableReducer implements DataTableReducer 
{
+  private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, 
find a better value.
+
+  private final QueryContext _queryContext;
+
+  private final int _limitForAggregatedResult;
+  private int _limitForGapfilledResult;
+
+  private final DateTimeGranularitySpec _dateTimeGranularity;
+  private final DateTimeFormatSpec _dateTimeFormatter;
+  private final long _startMs;
+  private final long _endMs;
+  private final long _timeBucketSize;
+
+  private final List<Integer> _groupByKeyIndexes;
+  private boolean [] _isGroupBySelections;
+  private final Set<Key> _groupByKeys;
+  private final Map<Key, Object[]> _previousByGroupKey;
+  private final Map<String, ExpressionContext> _fillExpressions;
+  private final List<ExpressionContext> _timeSeries;
+  private final GapfillUtils.GapfillType _gapfillType;
+
+  PreAggregationGapFillDataTableReducer(QueryContext queryContext) {
+    _queryContext = queryContext;
+    _gapfillType = queryContext.getGapfillType();
+    _limitForAggregatedResult = queryContext.getLimit();
+    if (_gapfillType == GapfillUtils.GapfillType.AGGREGATE_GAP_FILL
+        || _gapfillType == GapfillUtils.GapfillType.GAP_FILL) {
+      _limitForGapfilledResult = queryContext.getLimit();
+    } else {
+      _limitForGapfilledResult = queryContext.getSubQueryContext().getLimit();
+    }
+
+    ExpressionContext gapFillSelection = 
GapfillUtils.getGapfillExpressionContext(queryContext);
+
+    Preconditions.checkArgument(
+        gapFillSelection != null && gapFillSelection.getFunction() != null, 
"Gapfill Expression should be function.");
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    Preconditions.checkArgument(
+        args.size() > 5, "PreAggregateGapFill does not have correct number of 
arguments.");
+    Preconditions.checkArgument(
+        args.get(1).getLiteral() != null, "The second argument of 
PostAggregateGapFill should be TimeFormatter.");
+    Preconditions.checkArgument(
+        args.get(2).getLiteral() != null, "The third argument of 
PostAggregateGapFill should be start time.");
+    Preconditions.checkArgument(
+        args.get(3).getLiteral() != null, "The fourth argument of 
PostAggregateGapFill should be end time.");
+    Preconditions.checkArgument(
+        args.get(4).getLiteral() != null, "The fifth argument of 
PostAggregateGapFill should be time bucket size.");
+
+    _dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral());
+    _dateTimeGranularity = new 
DateTimeGranularitySpec(args.get(4).getLiteral());
+    String start = args.get(2).getLiteral();
+    _startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start));
+    String end = args.get(3).getLiteral();
+    _endMs = truncate(_dateTimeFormatter.fromFormatToMillis(end));
+    _timeBucketSize = _dateTimeGranularity.granularityToMillis();
+
+    _fillExpressions = GapfillUtils.getFillExpressions(gapFillSelection);
+
+    _previousByGroupKey = new HashMap<>();
+    _groupByKeyIndexes = new ArrayList<>();
+    _groupByKeys = new HashSet<>();
+
+    ExpressionContext timeseriesOn = 
GapfillUtils.getTimeSeriesOnExpressionContext(gapFillSelection);
+    Preconditions.checkArgument(timeseriesOn != null, "The TimeSeriesOn 
expressions should be specified.");
+    _timeSeries = timeseriesOn.getFunction().getArguments();
+  }
+
+  private void replaceColumnNameWithAlias(DataSchema dataSchema) {
+    QueryContext queryContext;
+    if (_gapfillType == GapfillUtils.GapfillType.AGGREGATE_GAP_FILL_AGGREGATE) 
{
+      queryContext = _queryContext.getSubQueryContext().getSubQueryContext();
+    } else if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL) {
+      queryContext = _queryContext;
+    } else {
+      queryContext = _queryContext.getSubQueryContext();
+    }
+    List<String> aliasList = queryContext.getAliasList();
+    Map<String, String> columnNameToAliasMap = new HashMap<>();
+    for (int i = 0; i < aliasList.size(); i++) {
+      if (aliasList.get(i) != null) {
+        ExpressionContext selection = 
queryContext.getSelectExpressions().get(i);
+        if (GapfillUtils.isGapfill(selection)) {
+          selection = selection.getFunction().getArguments().get(0);
+        }
+        columnNameToAliasMap.put(selection.toString(), aliasList.get(i));
+      }
+    }
+    for (int i = 0; i < dataSchema.getColumnNames().length; i++) {
+      if (columnNameToAliasMap.containsKey(dataSchema.getColumnNames()[i])) {
+        dataSchema.getColumnNames()[i] = 
columnNameToAliasMap.get(dataSchema.getColumnNames()[i]);
+      }
+    }
+  }
+
+  /**
+   * Computes the number of reduce threads to use per query.
+   * <ul>
+   *   <li> Use single thread if number of data tables to reduce is less than
+   *   {@value #MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE}.</li>
+   *   <li> Else, use min of max allowed reduce threads per query, and number 
of data tables.</li>
+   * </ul>
+   *
+   * @param numDataTables Number of data tables to reduce
+   * @param maxReduceThreadsPerQuery Max allowed reduce threads per query
+   * @return Number of reduce threads to use for the query
+   */
+  private int getNumReduceThreadsToUse(int numDataTables, int 
maxReduceThreadsPerQuery) {
+    // Use single thread if number of data tables < 
MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE.
+    if (numDataTables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE) {
+      return Math.min(1, numDataTables); // Number of data tables can be zero.
+    }
+
+    return Math.min(maxReduceThreadsPerQuery, numDataTables);
+  }
+
+  private IndexedTable getIndexedTable(DataSchema dataSchema, 
Collection<DataTable> dataTablesToReduce,
+      DataTableReducerContext reducerContext)
+      throws TimeoutException {
+    long start = System.currentTimeMillis();
+    int numDataTables = dataTablesToReduce.size();
+
+    // Get the number of threads to use for reducing.
+    // In case of single reduce thread, fall back to SimpleIndexedTable to 
avoid redundant locking/unlocking calls.
+    int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, 
reducerContext.getMaxReduceThreadsPerQuery());
+    int limit = _queryContext.getLimit();
+    // TODO: Make minTrimSize configurable
+    int trimSize = GroupByUtils.getTableCapacity(limit);
+    // NOTE: For query with HAVING clause, use trimSize as resultSize to 
ensure the result accuracy.
+    // TODO: Resolve the HAVING clause within the IndexedTable before 
returning the result
+    int resultSize = _queryContext.getHavingFilter() != null ? trimSize : 
limit;
+    int trimThreshold = reducerContext.getGroupByTrimThreshold();
+    IndexedTable indexedTable;
+    if (numReduceThreadsToUse <= 1) {
+      indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, 
resultSize, trimSize, trimThreshold);
+    } else {
+      if (trimThreshold >= GroupByOrderByCombineOperator.MAX_TRIM_THRESHOLD) {
+        // special case of trim threshold where it is set to max value.
+        // there won't be any trimming during upsert in this case.
+        // thus we can avoid the overhead of read-lock and write-lock
+        // in the upsert method.
+        indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, 
_queryContext, resultSize);
+      } else {
+        indexedTable = new ConcurrentIndexedTable(dataSchema, _queryContext, 
resultSize, trimSize, trimThreshold);
+      }
+    }
+
+    Future[] futures = new Future[numDataTables];
+    CountDownLatch countDownLatch = new CountDownLatch(numDataTables);
+
+    // Create groups of data tables that each thread can process concurrently.
+    // Given that numReduceThreads is <= numDataTables, each group will have 
at least one data table.
+    ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
+    List<List<DataTable>> reduceGroups = new 
ArrayList<>(numReduceThreadsToUse);
+
+    for (int i = 0; i < numReduceThreadsToUse; i++) {
+      reduceGroups.add(new ArrayList<>());
+    }
+    for (int i = 0; i < numDataTables; i++) {
+      reduceGroups.get(i % numReduceThreadsToUse).add(dataTables.get(i));
+    }
+
+    int cnt = 0;
+    ColumnDataType[] storedColumnDataTypes = 
dataSchema.getStoredColumnDataTypes();
+    int numColumns = storedColumnDataTypes.length;
+    for (List<DataTable> reduceGroup : reduceGroups) {
+      futures[cnt++] = reducerContext.getExecutorService().submit(new 
TraceRunnable() {
+        @Override
+        public void runJob() {
+          for (DataTable dataTable : reduceGroup) {
+            int numRows = dataTable.getNumberOfRows();
+
+            try {
+              for (int rowId = 0; rowId < numRows; rowId++) {
+                Object[] values = new Object[numColumns];
+                for (int colId = 0; colId < numColumns; colId++) {
+                  switch (storedColumnDataTypes[colId]) {
+                    case INT:
+                      values[colId] = dataTable.getInt(rowId, colId);
+                      break;
+                    case LONG:
+                      values[colId] = dataTable.getLong(rowId, colId);
+                      break;
+                    case FLOAT:
+                      values[colId] = dataTable.getFloat(rowId, colId);
+                      break;
+                    case DOUBLE:
+                      values[colId] = dataTable.getDouble(rowId, colId);
+                      break;
+                    case STRING:
+                      values[colId] = dataTable.getString(rowId, colId);
+                      break;
+                    case BYTES:
+                      values[colId] = dataTable.getBytes(rowId, colId);
+                      break;
+                    case OBJECT:
+                      values[colId] = dataTable.getObject(rowId, colId);
+                      break;
+                    // Add other aggregation intermediate result / group-by 
column type supports here
+                    default:
+                      throw new IllegalStateException();
+                  }
+                }
+                indexedTable.upsert(new Record(values));
+              }
+            } finally {
+              countDownLatch.countDown();
+            }
+          }
+        }
+      });
+    }
+
+    try {
+      long timeOutMs = reducerContext.getReduceTimeOutMs() - 
(System.currentTimeMillis() - start);
+      countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      for (Future future : futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
+      throw new TimeoutException("Timed out in broker reduce phase.");
+    }
+
+    indexedTable.finish(true);
+    return indexedTable;
+  }
+
+  /**
+   * Here are three things that happen
+   * 1. Sort the result sets from all pinot servers based on timestamp
+   * 2. Gapfill the data for missing entities per time bucket
+   * 3. Aggregate the dataset per time bucket.
+   */
+  @Override
+  public void reduceAndSetResults(String tableName, DataSchema dataSchema,
+      Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative 
brokerResponseNative,
+      DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
+    DataSchema resultTableSchema = getResultTableDataSchema(dataSchema);
+    if (dataTableMap.isEmpty()) {
+      brokerResponseNative.setResultTable(new ResultTable(resultTableSchema, 
Collections.emptyList()));
+      return;
+    }
+
+    String[] columns = dataSchema.getColumnNames();
+
+    Map<String, Integer> indexes = new HashMap<>();
+    for (int i = 0; i < columns.length; i++) {
+      indexes.put(columns[i], i);
+    }
+
+    _isGroupBySelections = new boolean[dataSchema.getColumnDataTypes().length];
+
+    // The first one argument of timeSeries is time column. The left ones are 
defining entity.
+    for (ExpressionContext entityColum : _timeSeries) {
+      int index = indexes.get(entityColum.getIdentifier());
+      _isGroupBySelections[index] = true;
+      _groupByKeyIndexes.add(index);
+    }
+
+    List<Object[]> sortedRawRows;
+    if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL_AGGREGATE
+        || _gapfillType == GapfillUtils.GapfillType.GAP_FILL
+        || _gapfillType == GapfillUtils.GapfillType.GAP_FILL_SELECT) {
+      sortedRawRows = mergeAndSort(dataTableMap.values(), dataSchema);
+    } else {
+      try {
+        IndexedTable indexedTable = getIndexedTable(dataSchema, 
dataTableMap.values(), reducerContext);
+        sortedRawRows = mergeAndSort(indexedTable, dataSchema);
+      } catch (TimeoutException e) {
+        brokerResponseNative.getProcessingExceptions()
+            .add(new 
QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE, 
e.getMessage()));
+        return;
+      }
+    }
+    List<Object[]> resultRows;
+    replaceColumnNameWithAlias(dataSchema);
+    if (_queryContext.getAggregationFunctions() != null) {
+      validateGroupByForOuterQuery();

Review comment:
       Why do we need to do this validation? If the _queryContext object was 
constructed properly, there should be no need to do this validation? Can this 
validation be moved to compilation stage?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PreAggregationGapFillDataTableReducer.java
##########
@@ -0,0 +1,775 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.GapfillUtils;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+/**
+ * Helper class to reduce and set Aggregation results into the 
BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class PreAggregationGapFillDataTableReducer implements DataTableReducer 
{
+  private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, 
find a better value.
+
+  private final QueryContext _queryContext;
+
+  private final int _limitForAggregatedResult;
+  private int _limitForGapfilledResult;
+
+  private final DateTimeGranularitySpec _dateTimeGranularity;
+  private final DateTimeFormatSpec _dateTimeFormatter;
+  private final long _startMs;
+  private final long _endMs;
+  private final long _timeBucketSize;
+
+  private final List<Integer> _groupByKeyIndexes;
+  private boolean [] _isGroupBySelections;
+  private final Set<Key> _groupByKeys;
+  private final Map<Key, Object[]> _previousByGroupKey;
+  private final Map<String, ExpressionContext> _fillExpressions;
+  private final List<ExpressionContext> _timeSeries;
+  private final GapfillUtils.GapfillType _gapfillType;
+
+  PreAggregationGapFillDataTableReducer(QueryContext queryContext) {
+    _queryContext = queryContext;
+    _gapfillType = queryContext.getGapfillType();
+    _limitForAggregatedResult = queryContext.getLimit();
+    if (_gapfillType == GapfillUtils.GapfillType.AGGREGATE_GAP_FILL
+        || _gapfillType == GapfillUtils.GapfillType.GAP_FILL) {
+      _limitForGapfilledResult = queryContext.getLimit();
+    } else {
+      _limitForGapfilledResult = queryContext.getSubQueryContext().getLimit();
+    }
+
+    ExpressionContext gapFillSelection = 
GapfillUtils.getGapfillExpressionContext(queryContext);
+
+    Preconditions.checkArgument(
+        gapFillSelection != null && gapFillSelection.getFunction() != null, 
"Gapfill Expression should be function.");
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    Preconditions.checkArgument(
+        args.size() > 5, "PreAggregateGapFill does not have correct number of 
arguments.");
+    Preconditions.checkArgument(
+        args.get(1).getLiteral() != null, "The second argument of 
PostAggregateGapFill should be TimeFormatter.");
+    Preconditions.checkArgument(
+        args.get(2).getLiteral() != null, "The third argument of 
PostAggregateGapFill should be start time.");
+    Preconditions.checkArgument(
+        args.get(3).getLiteral() != null, "The fourth argument of 
PostAggregateGapFill should be end time.");
+    Preconditions.checkArgument(
+        args.get(4).getLiteral() != null, "The fifth argument of 
PostAggregateGapFill should be time bucket size.");
+
+    _dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral());
+    _dateTimeGranularity = new 
DateTimeGranularitySpec(args.get(4).getLiteral());
+    String start = args.get(2).getLiteral();
+    _startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start));
+    String end = args.get(3).getLiteral();
+    _endMs = truncate(_dateTimeFormatter.fromFormatToMillis(end));
+    _timeBucketSize = _dateTimeGranularity.granularityToMillis();
+
+    _fillExpressions = GapfillUtils.getFillExpressions(gapFillSelection);
+
+    _previousByGroupKey = new HashMap<>();
+    _groupByKeyIndexes = new ArrayList<>();
+    _groupByKeys = new HashSet<>();
+
+    ExpressionContext timeseriesOn = 
GapfillUtils.getTimeSeriesOnExpressionContext(gapFillSelection);
+    Preconditions.checkArgument(timeseriesOn != null, "The TimeSeriesOn 
expressions should be specified.");
+    _timeSeries = timeseriesOn.getFunction().getArguments();
+  }
+
+  private void replaceColumnNameWithAlias(DataSchema dataSchema) {
+    QueryContext queryContext;
+    if (_gapfillType == GapfillUtils.GapfillType.AGGREGATE_GAP_FILL_AGGREGATE) 
{
+      queryContext = _queryContext.getSubQueryContext().getSubQueryContext();
+    } else if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL) {
+      queryContext = _queryContext;
+    } else {
+      queryContext = _queryContext.getSubQueryContext();
+    }
+    List<String> aliasList = queryContext.getAliasList();
+    Map<String, String> columnNameToAliasMap = new HashMap<>();
+    for (int i = 0; i < aliasList.size(); i++) {
+      if (aliasList.get(i) != null) {
+        ExpressionContext selection = 
queryContext.getSelectExpressions().get(i);
+        if (GapfillUtils.isGapfill(selection)) {
+          selection = selection.getFunction().getArguments().get(0);
+        }
+        columnNameToAliasMap.put(selection.toString(), aliasList.get(i));
+      }
+    }
+    for (int i = 0; i < dataSchema.getColumnNames().length; i++) {
+      if (columnNameToAliasMap.containsKey(dataSchema.getColumnNames()[i])) {
+        dataSchema.getColumnNames()[i] = 
columnNameToAliasMap.get(dataSchema.getColumnNames()[i]);
+      }
+    }
+  }
+
+  /**
+   * Computes the number of reduce threads to use per query.
+   * <ul>
+   *   <li> Use single thread if number of data tables to reduce is less than
+   *   {@value #MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE}.</li>
+   *   <li> Else, use min of max allowed reduce threads per query, and number 
of data tables.</li>
+   * </ul>
+   *
+   * @param numDataTables Number of data tables to reduce
+   * @param maxReduceThreadsPerQuery Max allowed reduce threads per query
+   * @return Number of reduce threads to use for the query
+   */
+  private int getNumReduceThreadsToUse(int numDataTables, int 
maxReduceThreadsPerQuery) {
+    // Use single thread if number of data tables < 
MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE.
+    if (numDataTables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE) {
+      return Math.min(1, numDataTables); // Number of data tables can be zero.
+    }
+
+    return Math.min(maxReduceThreadsPerQuery, numDataTables);
+  }
+
+  private IndexedTable getIndexedTable(DataSchema dataSchema, 
Collection<DataTable> dataTablesToReduce,
+      DataTableReducerContext reducerContext)
+      throws TimeoutException {
+    long start = System.currentTimeMillis();
+    int numDataTables = dataTablesToReduce.size();
+
+    // Get the number of threads to use for reducing.
+    // In case of single reduce thread, fall back to SimpleIndexedTable to 
avoid redundant locking/unlocking calls.
+    int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, 
reducerContext.getMaxReduceThreadsPerQuery());
+    int limit = _queryContext.getLimit();
+    // TODO: Make minTrimSize configurable
+    int trimSize = GroupByUtils.getTableCapacity(limit);
+    // NOTE: For query with HAVING clause, use trimSize as resultSize to 
ensure the result accuracy.
+    // TODO: Resolve the HAVING clause within the IndexedTable before 
returning the result
+    int resultSize = _queryContext.getHavingFilter() != null ? trimSize : 
limit;
+    int trimThreshold = reducerContext.getGroupByTrimThreshold();
+    IndexedTable indexedTable;
+    if (numReduceThreadsToUse <= 1) {
+      indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, 
resultSize, trimSize, trimThreshold);
+    } else {
+      if (trimThreshold >= GroupByOrderByCombineOperator.MAX_TRIM_THRESHOLD) {
+        // special case of trim threshold where it is set to max value.
+        // there won't be any trimming during upsert in this case.
+        // thus we can avoid the overhead of read-lock and write-lock
+        // in the upsert method.
+        indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, 
_queryContext, resultSize);
+      } else {
+        indexedTable = new ConcurrentIndexedTable(dataSchema, _queryContext, 
resultSize, trimSize, trimThreshold);
+      }
+    }
+
+    Future[] futures = new Future[numDataTables];
+    CountDownLatch countDownLatch = new CountDownLatch(numDataTables);
+
+    // Create groups of data tables that each thread can process concurrently.
+    // Given that numReduceThreads is <= numDataTables, each group will have 
at least one data table.
+    ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
+    List<List<DataTable>> reduceGroups = new 
ArrayList<>(numReduceThreadsToUse);
+
+    for (int i = 0; i < numReduceThreadsToUse; i++) {
+      reduceGroups.add(new ArrayList<>());
+    }
+    for (int i = 0; i < numDataTables; i++) {
+      reduceGroups.get(i % numReduceThreadsToUse).add(dataTables.get(i));
+    }
+
+    int cnt = 0;
+    ColumnDataType[] storedColumnDataTypes = 
dataSchema.getStoredColumnDataTypes();
+    int numColumns = storedColumnDataTypes.length;
+    for (List<DataTable> reduceGroup : reduceGroups) {
+      futures[cnt++] = reducerContext.getExecutorService().submit(new 
TraceRunnable() {
+        @Override
+        public void runJob() {
+          for (DataTable dataTable : reduceGroup) {
+            int numRows = dataTable.getNumberOfRows();
+
+            try {
+              for (int rowId = 0; rowId < numRows; rowId++) {
+                Object[] values = new Object[numColumns];
+                for (int colId = 0; colId < numColumns; colId++) {
+                  switch (storedColumnDataTypes[colId]) {
+                    case INT:
+                      values[colId] = dataTable.getInt(rowId, colId);
+                      break;
+                    case LONG:
+                      values[colId] = dataTable.getLong(rowId, colId);
+                      break;
+                    case FLOAT:
+                      values[colId] = dataTable.getFloat(rowId, colId);
+                      break;
+                    case DOUBLE:
+                      values[colId] = dataTable.getDouble(rowId, colId);
+                      break;
+                    case STRING:
+                      values[colId] = dataTable.getString(rowId, colId);
+                      break;
+                    case BYTES:
+                      values[colId] = dataTable.getBytes(rowId, colId);
+                      break;
+                    case OBJECT:
+                      values[colId] = dataTable.getObject(rowId, colId);
+                      break;
+                    // Add other aggregation intermediate result / group-by 
column type supports here
+                    default:
+                      throw new IllegalStateException();
+                  }
+                }
+                indexedTable.upsert(new Record(values));
+              }
+            } finally {
+              countDownLatch.countDown();
+            }
+          }
+        }
+      });
+    }
+
+    try {
+      long timeOutMs = reducerContext.getReduceTimeOutMs() - 
(System.currentTimeMillis() - start);
+      countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      for (Future future : futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
+      throw new TimeoutException("Timed out in broker reduce phase.");
+    }
+
+    indexedTable.finish(true);
+    return indexedTable;
+  }
+
+  /**
+   * Here are three things that happen
+   * 1. Sort the result sets from all pinot servers based on timestamp
+   * 2. Gapfill the data for missing entities per time bucket
+   * 3. Aggregate the dataset per time bucket.
+   */
+  @Override
+  public void reduceAndSetResults(String tableName, DataSchema dataSchema,
+      Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative 
brokerResponseNative,
+      DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
+    DataSchema resultTableSchema = getResultTableDataSchema(dataSchema);
+    if (dataTableMap.isEmpty()) {
+      brokerResponseNative.setResultTable(new ResultTable(resultTableSchema, 
Collections.emptyList()));
+      return;
+    }
+
+    String[] columns = dataSchema.getColumnNames();
+
+    Map<String, Integer> indexes = new HashMap<>();
+    for (int i = 0; i < columns.length; i++) {
+      indexes.put(columns[i], i);
+    }
+
+    _isGroupBySelections = new boolean[dataSchema.getColumnDataTypes().length];
+
+    // The first one argument of timeSeries is time column. The left ones are 
defining entity.
+    for (ExpressionContext entityColum : _timeSeries) {
+      int index = indexes.get(entityColum.getIdentifier());
+      _isGroupBySelections[index] = true;
+      _groupByKeyIndexes.add(index);
+    }
+
+    List<Object[]> sortedRawRows;
+    if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL_AGGREGATE
+        || _gapfillType == GapfillUtils.GapfillType.GAP_FILL
+        || _gapfillType == GapfillUtils.GapfillType.GAP_FILL_SELECT) {
+      sortedRawRows = mergeAndSort(dataTableMap.values(), dataSchema);
+    } else {
+      try {
+        IndexedTable indexedTable = getIndexedTable(dataSchema, 
dataTableMap.values(), reducerContext);
+        sortedRawRows = mergeAndSort(indexedTable, dataSchema);

Review comment:
       It seems like mergeAndSort can be made much much faster if the the 
servers pre-sorted their resultset before sending it to the broker. Servers 
would sort their resultset in parallel and in this case, one would only need to 
do a k-way merge here to produce a globally sorted resultset without actually 
doing a full global sort or all the records.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PreAggregationGapFillDataTableReducer.java
##########
@@ -0,0 +1,775 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.GapfillUtils;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+/**
+ * Helper class to reduce and set Aggregation results into the 
BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class PreAggregationGapFillDataTableReducer implements DataTableReducer 
{

Review comment:
       Seems like this should be renamed to GapFillDataTableReducer?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to