Jackie-Jiang commented on a change in pull request #5605:
URL: https://github.com/apache/incubator-pinot/pull/5605#discussion_r445227178



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
##########
@@ -171,106 +160,81 @@ public void reduceAndSetResults(String tableName, 
DataSchema dataSchema,
    */
   private void setSQLGroupByInResultTable(BrokerResponseNative 
brokerResponseNative, DataSchema dataSchema,
       Collection<DataTable> dataTables) {
-
+    DataSchema resultTableSchema = getSQLResultTableSchema(dataSchema);
     IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables);
-
-    int[] finalSchemaMapIdx = null;
-    if (_sqlSelectionList != null) {
-      finalSchemaMapIdx = getFinalSchemaMapIdx();
-    }
-    List<Object[]> rows = new ArrayList<>();
     Iterator<Record> sortedIterator = indexedTable.iterator();
-    int numRows = 0;
-    while (numRows < _groupBy.getTopN() && sortedIterator.hasNext()) {
-      Record nextRecord = sortedIterator.next();
-      Object[] values = nextRecord.getValues();
-
-      int index = _numGroupBy;
-      int aggNum = 0;
-      while (index < _numColumns) {
-        values[index] = AggregationFunctionUtils
-            
.getSerializableValue(_aggregationFunctions[aggNum++].extractFinalResult(values[index]));
-        index++;
+    int limit = _queryContext.getLimit();
+    List<Object[]> rows = new ArrayList<>(limit);
+
+    if (_sqlQuery) {
+      // NOTE: For SQL query, need to reorder the columns in the data table 
based on the select expressions.
+
+      int[] selectExpressionIndexMap = getSelectExpressionIndexMap();
+      int numSelectExpressions = selectExpressionIndexMap.length;
+      String[] columnNames = resultTableSchema.getColumnNames();
+      DataSchema.ColumnDataType[] columnDataTypes = 
resultTableSchema.getColumnDataTypes();
+      String[] reorderedColumnNames = new String[numSelectExpressions];
+      DataSchema.ColumnDataType[] reorderedColumnDataTypes = new 
DataSchema.ColumnDataType[numSelectExpressions];
+      resultTableSchema = new DataSchema(reorderedColumnNames, 
reorderedColumnDataTypes);
+      for (int i = 0; i < numSelectExpressions; i++) {
+        reorderedColumnNames[i] = columnNames[selectExpressionIndexMap[i]];
+        reorderedColumnDataTypes[i] = 
columnDataTypes[selectExpressionIndexMap[i]];
       }
-      if (_sqlSelectionList != null) {
-        Object[] finalValues = new Object[_sqlSelectionList.size()];
-        for (int i = 0; i < finalSchemaMapIdx.length; i++) {
-          finalValues[i] = values[finalSchemaMapIdx[i]];
+      while (rows.size() < limit && sortedIterator.hasNext()) {
+        Record nextRecord = sortedIterator.next();
+        Object[] values = nextRecord.getValues();
+        for (int i = 0; i < _numAggregationFunctions; i++) {
+          int valueIndex = i + _numGroupByExpressions;
+          values[valueIndex] = AggregationFunctionUtils
+              
.getSerializableValue(_aggregationFunctions[i].extractFinalResult(values[valueIndex]));
         }
-        rows.add(finalValues);
-      } else {
-        rows.add(values);
-      }
-      numRows++;
-    }
 
-    DataSchema finalDataSchema = getSQLResultTableSchema(dataSchema);
-    if (_sqlSelectionList != null) {
-      int columnSize = _sqlSelectionList.size();
-      String[] columns = new String[columnSize];
-      DataSchema.ColumnDataType[] columnDataTypes = new 
DataSchema.ColumnDataType[columnSize];
-      for (int i = 0; i < columnSize; i++) {
-        columns[i] = finalDataSchema.getColumnName(finalSchemaMapIdx[i]);
-        columnDataTypes[i] = 
finalDataSchema.getColumnDataType(finalSchemaMapIdx[i]);
+        Object[] reorderedValues = new Object[numSelectExpressions];
+        for (int i = 0; i < numSelectExpressions; i++) {
+          reorderedValues[i] = values[selectExpressionIndexMap[i]];
+        }
+        rows.add(reorderedValues);
       }
-      finalDataSchema = new DataSchema(columns, columnDataTypes);
-    }
-    brokerResponseNative.setResultTable(new ResultTable(finalDataSchema, 
rows));
-  }
-
-  /**
-   * Generate index mapping based on selection expression to DataTable schema, 
which is groupBy columns,
-   * then aggregation functions.
-   *
-   * @return a mapping from final schema idx to corresponding idx in data 
table schema.
-   */
-  private int[] getFinalSchemaMapIdx() {
-    int[] finalSchemaMapIdx = new int[_sqlSelectionList.size()];
-    int nextAggregationIdx = _numGroupBy;
-    for (int i = 0; i < _sqlSelectionList.size(); i++) {
-      finalSchemaMapIdx[i] = getExpressionMapIdx(_sqlSelectionList.get(i), 
nextAggregationIdx);
-      if (finalSchemaMapIdx[i] == nextAggregationIdx) {
-        nextAggregationIdx++;
+    } else {
+      while (rows.size() < limit && sortedIterator.hasNext()) {

Review comment:
       Added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to