walterddr commented on code in PR #9892:
URL: https://github.com/apache/pinot/pull/9892#discussion_r1039932237


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -99,6 +107,181 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
+  /**
+   * this is data transfer block compose method is here to ensure that V1 
results match what the expected projection
+   * schema in the calcite logical operator.
+   * <p> it applies different clean up mechanism based on different type of 
{@link BaseResultsBlock} and the
+   *     {@link org.apache.pinot.core.query.request.context.QueryContext}.</p>
+   * <p> this also applies to the canonicalization of the data types during 
post post-process step.</p>
+   *
+   * @param responseBlock result block from leaf stage
+   * @param desiredDataSchema the desired schema for send operator
+   * @return the converted {@link TransferableBlock} that conform with the 
desiredDataSchema
+   */
+  private static TransferableBlock 
composeTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
+    if (resultsBlock instanceof SelectionResultsBlock) {
+      return composeSelectTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof AggregationResultsBlock) {
+      return composeAggregationTransferableBlock(responseBlock, 
desiredDataSchema);
+    } else if (resultsBlock instanceof GroupByResultsBlock) {
+      return composeGroupByTransferableBlock(responseBlock, desiredDataSchema);
+    } else if (resultsBlock instanceof DistinctResultsBlock) {
+      return composeDistinctTransferableBlock(responseBlock, 
desiredDataSchema);
+    } else {
+      throw new IllegalArgumentException("Unsupported result block type: " + 
resultsBlock);
+    }
+  }
+
+  /**
+   * we only need to rearrange columns when distinct is not conforming with 
selection columns, specifically:
+   * <ul>
+   *   <li> when distinct is not returning final result:
+   *       it should never happen as non-final result contains Object opaque 
columns v2 engine can't process.</li>
+   *   <li> when distinct columns are not all being selected:
+   *       it should never happen as leaf stage MUST return the entire 
list.</li>
+   * </ul>
+   *
+   * @see 
LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock,
 DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock 
composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    List<String> selectionColumns = Arrays.asList(
+        ((DistinctAggregationFunction) 
responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
+    int[] columnIndices = 
SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct 
table schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + desiredDataSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's group by 
result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see 
LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock,
 DataSchema).
+   * @see 
LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock,
 DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock 
composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // GROUP-BY column names conforms with selection expression
+    List<String> selectionColumns = 
responseBlock.getQueryContext().getSelectExpressions().stream()
+        .map(e -> e.toString()).collect(Collectors.toList());
+    int[] columnIndices = 
SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by 
result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+
+  /**
+   * Calcite generated {@link DataSchema} should conform with Pinot's agg 
result schema thus we only need to check
+   * for correctness similar to distinct case.
+   *
+   * @see 
LeafStageTransferableBlockOperator#composeDirectTransferableBlock(InstanceResponseBlock,
 DataSchema).
+   * @see 
LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock,
 DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock 
composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    // AGG-ONLY column names are derived from AggFunction.getColumnName()
+    List<String> selectionColumns = 
Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
+        a -> a.getColumnName()).collect(Collectors.toList());
+    int[] columnIndices = 
SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate 
result schema for leaf stage."
+        + "Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
+    return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+  }
+
+  /**
+   * Only re-arrange columns to match the projection in the case of select / 
order-by, when the desiredDataSchema
+   * doesn't conform with the result block schema exactly.
+   *
+   * @see 
LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock,
 DataSchema).
+   */
+  @SuppressWarnings("ConstantConditions")
+  private static TransferableBlock 
composeSelectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    List<String> selectionColumns = 
SelectionOperatorUtils.getSelectionColumns(responseBlock.getQueryContext(),
+        resultSchema);
+    int[] columnIndices = 
SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    if (!inOrder(columnIndices)) {
+      DataSchema adjustedResultSchema = 
SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+      
Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+          adjustedResultSchema.getColumnDataTypes()), "Incompatible result 
data schema: "
+          + "Expecting: " + desiredDataSchema + " Actual: " + 
adjustedResultSchema);
+      // Extract the result rows
+      Collection<Object[]> resultRows = responseBlock.getRows();
+      List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+      return composeColumnIndexedTransferableBlock(responseBlock, 
adjustedResultSchema, columnIndices);
+    } else {
+      return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
+    }
+  }
+
+  /**
+   * Created {@link TransferableBlock} using column indices.
+   *
+   * @see 
LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock,
 DataSchema).
+   */
+  private static TransferableBlock 
composeColumnIndexedTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema, int[] columnIndices) {
+    Collection<Object[]> resultRows = responseBlock.getRows();
+    List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+    if (responseBlock.getQueryContext().getOrderByExpressions() != null) {
+      // extract result row in ordered fashion
+      PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>) 
resultRows;
+      while (!priorityQueue.isEmpty()) {
+        extractedRows.add(canonicalizeRow(priorityQueue.poll(), 
desiredDataSchema, columnIndices));
+      }
+    } else {
+      // extract result row in non-ordered fashion
+      for (Object[] row : resultRows) {
+        extractedRows.add(canonicalizeRow(row, desiredDataSchema, 
columnIndices));
+      }
+    }
+    return new TransferableBlock(extractedRows, desiredDataSchema, 
DataBlock.Type.ROW);
+  }
+
+  /**
+   * Fallback mechanism for {@link TransferableBlock}, used when no special 
handling is necessary. This method only
+   * performs {@link DataSchema.ColumnDataType} canonicalization.
+   *
+   * @see 
LeafStageTransferableBlockOperator#composeTransferableBlock(InstanceResponseBlock,
 DataSchema).
+   */
+  private static TransferableBlock 
composeDirectTransferableBlock(InstanceResponseBlock responseBlock,
+      DataSchema desiredDataSchema) {
+    Collection<Object[]> resultRows = responseBlock.getRows();
+    List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+    if (resultRows instanceof List) {
+      for (Object[] orgRow : resultRows) {
+        extractedRows.add(canonicalizeRow(orgRow, desiredDataSchema));
+      }
+    } else if (resultRows instanceof PriorityQueue) {

Review Comment:
   good idea. i will change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to