Copilot commented on code in PR #17168:
URL: https://github.com/apache/pinot/pull/17168#discussion_r2539624168


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnnestOperator.java:
##########
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.plannode.UnnestNode;
+import org.apache.pinot.query.runtime.blocks.MseBlock;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
+import 
org.apache.pinot.query.runtime.operator.operands.TransformOperandFactory;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * UnnestOperator expands an array/collection value per input row into zero or 
more output rows.
+ * The output schema is provided by the associated UnnestNode's data schema.
+ */
+public class UnnestOperator extends MultiStageOperator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(UnnestOperator.class);
+  private static final String EXPLAIN_NAME = "UNNEST";
+
+  private final MultiStageOperator _input;
+  private final TransformOperand _arrayExprOperand;
+  private final DataSchema _resultSchema;
+  private final boolean _withOrdinality;
+  private final int _elementIndex;
+  private final int _ordinalityIndex;
+  private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
+
+  public UnnestOperator(OpChainExecutionContext context, MultiStageOperator 
input, DataSchema inputSchema,
+      UnnestNode node) {
+    super(context);
+    _input = input;
+    _arrayExprOperand = 
TransformOperandFactory.getTransformOperand(node.getArrayExpr(), inputSchema);
+    _resultSchema = node.getDataSchema();
+    _withOrdinality = node.isWithOrdinality();
+    _elementIndex = node.getElementIndex();
+    _ordinalityIndex = node.getOrdinalityIndex();
+  }
+
+  @Override
+  public void registerExecution(long time, int numRows, long memoryUsedBytes, 
long gcTimeMs) {
+    _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
+    _statMap.merge(StatKey.EMITTED_ROWS, numRows);
+    _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+    _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
+  }
+
+  @Override
+  protected Logger logger() {
+    return LOGGER;
+  }
+
+  @Override
+  public List<MultiStageOperator> getChildOperators() {
+    return List.of(_input);
+  }
+
+  @Override
+  public Type getOperatorType() {
+    return Type.UNNEST;
+  }
+
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected MseBlock getNextBlock() {
+    MseBlock block = _input.nextBlock();
+    if (block.isEos()) {
+      return block;
+    }
+    MseBlock.Data dataBlock = (MseBlock.Data) block;
+    List<Object[]> inputRows = dataBlock.asRowHeap().getRows();
+    List<Object[]> outRows = new ArrayList<>();
+
+    for (Object[] row : inputRows) {
+      Object value = _arrayExprOperand.apply(row);
+      int ord = 1;
+      if (value == null) {
+        continue;
+      }
+      if (value instanceof List) {
+        for (Object element : (List<?>) value) {
+          outRows.add(appendElement(row, element, ord++));
+        }
+      } else if (value.getClass().isArray()) {
+        int length = java.lang.reflect.Array.getLength(value);
+        for (int i = 0; i < length; i++) {
+          Object element = java.lang.reflect.Array.get(value, i);
+          outRows.add(appendElement(row, element, ord++));

Review Comment:
   Using reflection (`java.lang.reflect.Array`) to access array elements can be 
inefficient for large arrays. Consider type-specific handling for primitive 
arrays (e.g., `int[]`, `long[]`) to avoid reflection overhead, especially since 
the operator will process many rows.
   ```suggestion
           if (value instanceof int[]) {
             int[] arr = (int[]) value;
             for (int i = 0; i < length; i++) {
               outRows.add(appendElement(row, arr[i], ord++));
             }
           } else if (value instanceof long[]) {
             long[] arr = (long[]) value;
             for (int i = 0; i < length; i++) {
               outRows.add(appendElement(row, arr[i], ord++));
             }
           } else if (value instanceof double[]) {
             double[] arr = (double[]) value;
             for (int i = 0; i < length; i++) {
               outRows.add(appendElement(row, arr[i], ord++));
             }
           } else if (value instanceof float[]) {
             float[] arr = (float[]) value;
             for (int i = 0; i < length; i++) {
               outRows.add(appendElement(row, arr[i], ord++));
             }
           } else if (value instanceof short[]) {
             short[] arr = (short[]) value;
             for (int i = 0; i < length; i++) {
               outRows.add(appendElement(row, arr[i], ord++));
             }
           } else if (value instanceof byte[]) {
             byte[] arr = (byte[]) value;
             for (int i = 0; i < length; i++) {
               outRows.add(appendElement(row, arr[i], ord++));
             }
           } else if (value instanceof char[]) {
             char[] arr = (char[]) value;
             for (int i = 0; i < length; i++) {
               outRows.add(appendElement(row, arr[i], ord++));
             }
           } else if (value instanceof boolean[]) {
             boolean[] arr = (boolean[]) value;
             for (int i = 0; i < length; i++) {
               outRows.add(appendElement(row, arr[i], ord++));
             }
           } else {
             Object[] arr = (Object[]) value;
             for (int i = 0; i < length; i++) {
               outRows.add(appendElement(row, arr[i], ord++));
             }
   ```



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java:
##########
@@ -159,6 +169,254 @@ public PlanNode toPlanNode(RelNode node) {
     return result;
   }
 
+  private UnnestNode convertLogicalUncollect(Uncollect node) {
+    // Expect input provides a single array expression (typically a Project 
with one expression)
+    RexExpression arrayExpr = null;
+    RelNode input = node.getInput();
+    if (input instanceof Project) {
+      Project p = (Project) input;
+      if (p.getProjects().size() == 1) {
+        arrayExpr = RexExpressionUtils.fromRexNode(p.getProjects().get(0));
+      }
+    }
+    if (arrayExpr == null) {
+      // Fallback: refer to first input ref
+      arrayExpr = new RexExpression.InputRef(0);
+    }
+    String columnAlias = null;
+    if (!node.getRowType().getFieldList().isEmpty()) {
+      columnAlias = node.getRowType().getFieldList().get(0).getName();
+    }
+    boolean withOrdinality = false;
+    String ordinalityAlias = null;
+    // Calcite Uncollect exposes withOrdinality via field names if present; if 
>1 fields and last is ordinality
+    if (node.getRowType().getFieldList().size() > 1) {
+      withOrdinality = true;
+      ordinalityAlias = node.getRowType().getFieldList().get(1).getName();
+    }
+    return new UnnestNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), 
NodeHint.EMPTY,
+        convertInputs(node.getInputs()), arrayExpr, columnAlias, 
withOrdinality, ordinalityAlias);
+  }
+
+  private BasePlanNode convertLogicalCorrelate(LogicalCorrelate node) {
+    // Pattern: Correlate(left, Uncollect(Project(correlatedField)))
+    RelNode right = node.getRight();
+    RelDataType leftRowType = node.getLeft().getRowType();
+    Project correlatedProject = findProjectUnderUncollect(right);
+    RexExpression arrayExpr =
+        correlatedProject != null ? deriveArrayExpression(correlatedProject, 
leftRowType) : null;
+    if (arrayExpr == null) {
+      arrayExpr = new RexExpression.InputRef(0);
+    }
+    LogicalFilter correlateFilter = findCorrelateFilter(right);
+    boolean wrapWithFilter = correlateFilter != null;
+    RexNode filterCondition = wrapWithFilter ? correlateFilter.getCondition() 
: null;
+    // Use the entire correlate output schema
+    PlanNode inputNode = toPlanNode(node.getLeft());
+    // Ensure inputs list is mutable because downstream visitors (e.g., 
withInputs methods) may modify the inputs list
+    List<PlanNode> inputs = new ArrayList<>(List.of(inputNode));
+    ElementOrdinalInfo ordinalInfo = deriveElementOrdinalInfo(right, 
leftRowType);
+    boolean withOrdinality = ordinalInfo.hasOrdinality();
+    String elementAlias = ordinalInfo.getElementAlias();
+    String ordinalityAlias = ordinalInfo.getOrdinalityAlias();
+    int elementIndex = ordinalInfo.getElementIndex();
+    int ordinalityIndex = ordinalInfo.getOrdinalityIndex();
+    UnnestNode unnest = new UnnestNode(DEFAULT_STAGE_ID, 
toDataSchema(node.getRowType()), NodeHint.EMPTY,
+        inputs, arrayExpr, elementAlias, withOrdinality, ordinalityAlias, 
elementIndex, ordinalityIndex);
+    if (wrapWithFilter) {
+      // Wrap Unnest with a FilterNode; rewrite filter InputRefs 
(0:elem,1:idx) to absolute output indexes
+      RexExpression rewritten =
+          rewriteInputRefs(RexExpressionUtils.fromRexNode(filterCondition), 
elementIndex, ordinalityIndex);
+      return new FilterNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), 
NodeHint.EMPTY,
+          new ArrayList<>(List.of(unnest)), rewritten);
+    }
+    return unnest;
+  }
+
+  @Nullable
+  private static Project findProjectUnderUncollect(RelNode node) {
+    RelNode current = node;
+    while (current != null) {
+      if (current instanceof Uncollect) {
+        RelNode input = ((Uncollect) current).getInput();
+        return input instanceof Project ? (Project) input : null;
+      }
+      if (current instanceof Project) {
+        current = ((Project) current).getInput();
+      } else if (current instanceof LogicalFilter) {
+        current = ((LogicalFilter) current).getInput();
+      } else {
+        return null;
+      }
+    }
+    return null;
+  }
+
+  @Nullable
+  private static RexExpression deriveArrayExpression(Project project, 
RelDataType leftRowType) {
+    if (project.getProjects().size() != 1) {
+      return null;
+    }
+    RexNode rex = project.getProjects().get(0);
+    Integer idx = resolveInputRefFromCorrel(rex, leftRowType);
+    if (idx != null) {
+      return new RexExpression.InputRef(idx);
+    }
+    RexExpression candidate = RexExpressionUtils.fromRexNode(rex);
+    return candidate instanceof RexExpression.InputRef ? candidate : new 
RexExpression.InputRef(0);
+  }
+
+  @Nullable
+  private static LogicalFilter findCorrelateFilter(RelNode node) {
+    RelNode current = node;
+    while (current instanceof Project || current instanceof LogicalFilter) {
+      if (current instanceof LogicalFilter) {
+        return (LogicalFilter) current;
+      }
+      current = ((Project) current).getInput();
+    }
+    return null;
+  }
+
+  private static ElementOrdinalInfo deriveElementOrdinalInfo(RelNode right, 
RelDataType leftRowType) {
+    ElementOrdinalAccumulator accumulator = new 
ElementOrdinalAccumulator(leftRowType.getFieldCount());
+    if (right instanceof Uncollect) {
+      accumulator.populateFromRowType(right.getRowType());
+    } else if (right instanceof Project) {
+      accumulator.populateFromProject((Project) right);
+    } else if (right instanceof LogicalFilter) {
+      LogicalFilter filter = (LogicalFilter) right;
+      RelNode filterInput = filter.getInput();
+      if (filterInput instanceof Uncollect) {
+        accumulator.populateFromRowType(filter.getRowType());
+      } else if (filterInput instanceof Project) {
+        accumulator.populateFromProject((Project) filterInput);
+      }
+    }
+    return accumulator.toInfo();
+  }
+
+  private static final class ElementOrdinalAccumulator {
+    private final int _base;
+    private String _elementAlias;
+    private String _ordinalityAlias;
+    private int _elementIndex = -1;
+    private int _ordinalityIndex = -1;
+
+    ElementOrdinalAccumulator(int base) {
+      _base = base;
+    }
+
+    void populateFromRowType(RelDataType rowType) {
+      List<RelDataTypeField> fields = rowType.getFieldList();
+      if (!fields.isEmpty() && _elementIndex < 0) {
+        _elementAlias = fields.get(0).getName();
+        _elementIndex = _base;
+      }
+      if (fields.size() > 1 && _ordinalityIndex < 0) {
+        _ordinalityAlias = fields.get(1).getName();
+        _ordinalityIndex = _base + 1;
+      }
+    }
+
+    void populateFromProject(Project project) {
+      List<RexNode> projects = project.getProjects();
+      List<RelDataTypeField> projFields = project.getRowType().getFieldList();
+      for (int j = 0; j < projects.size(); j++) {
+        RexNode pj = projects.get(j);
+        if (pj instanceof RexInputRef) {
+          int idx = ((RexInputRef) pj).getIndex();
+          String outName = projFields.get(j).getName();
+          if (idx == 0 && _elementIndex < 0) {
+            _elementIndex = _base + j;
+            _elementAlias = outName;
+          } else if (idx == 1 && _ordinalityIndex < 0) {
+            _ordinalityIndex = _base + j;
+            _ordinalityAlias = outName;
+          }
+        }
+      }
+    }
+
+    ElementOrdinalInfo toInfo() {
+      return new ElementOrdinalInfo(_elementAlias, _ordinalityAlias, 
_elementIndex, _ordinalityIndex);
+    }
+  }
+
+  private static final class ElementOrdinalInfo {
+    private final String _elementAlias;
+    private final String _ordinalityAlias;
+    private final int _elementIndex;
+    private final int _ordinalityIndex;
+
+    ElementOrdinalInfo(String elementAlias, String ordinalityAlias, int 
elementIndex, int ordinalityIndex) {
+      _elementAlias = elementAlias;
+      _ordinalityAlias = ordinalityAlias;
+      _elementIndex = elementIndex;
+      _ordinalityIndex = ordinalityIndex;
+    }
+
+    String getElementAlias() {
+      return _elementAlias;
+    }
+
+    String getOrdinalityAlias() {
+      return _ordinalityAlias;
+    }
+
+    int getElementIndex() {
+      return _elementIndex;
+    }
+
+    int getOrdinalityIndex() {
+      return _ordinalityIndex;
+    }
+
+    boolean hasOrdinality() {
+      return _ordinalityIndex >= 0;
+    }
+  }
+
+  private static RexExpression rewriteInputRefs(RexExpression expr, int 
elemOutIdx, int ordOutIdx) {
+    if (expr instanceof RexExpression.InputRef) {
+      int idx = ((RexExpression.InputRef) expr).getIndex();
+      if (idx == 0 && elemOutIdx >= 0) {
+        return new RexExpression.InputRef(elemOutIdx);
+      } else if (idx == 1 && ordOutIdx >= 0) {
+        return new RexExpression.InputRef(ordOutIdx);
+      } else {
+        return expr;
+      }
+    } else if (expr instanceof RexExpression.FunctionCall) {
+      RexExpression.FunctionCall fc = (RexExpression.FunctionCall) expr;
+      List<RexExpression> ops = fc.getFunctionOperands();
+      List<RexExpression> rewritten = new ArrayList<>(ops.size());
+      for (RexExpression op : ops) {
+        rewritten.add(rewriteInputRefs(op, elemOutIdx, ordOutIdx));
+      }
+      return new RexExpression.FunctionCall(fc.getDataType(), 
fc.getFunctionName(), rewritten);
+    } else {
+      return expr;
+    }
+  }
+
+  private static Integer resolveInputRefFromCorrel(RexNode expr, RelDataType 
leftRowType) {
+    if (expr instanceof RexFieldAccess) {
+      RexFieldAccess fieldAccess = (RexFieldAccess) expr;
+      if (fieldAccess.getReferenceExpr() instanceof RexCorrelVariable) {
+        String fieldName = fieldAccess.getField().getName();
+        List<RelDataTypeField> fields = leftRowType.getFieldList();
+        for (int i = 0; i < fields.size(); i++) {
+          // SQL field names are case-insensitive, so we must use 
equalsIgnoreCase for correct matching.

Review Comment:
   [nitpick] Add a comment explaining why `equalsIgnoreCase` is used here, as 
the existing inline comment on line 410 explains SQL field name 
case-insensitivity but could be more prominent or positioned before the loop 
for better visibility.
   ```suggestion
           // SQL field names are case-insensitive, so we must use 
equalsIgnoreCase for correct matching.
           // This ensures that queries referencing fields with different cases 
(e.g., "UserId" vs "userid") are handled correctly.
           for (int i = 0; i < fields.size(); i++) {
   ```



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java:
##########
@@ -159,6 +169,254 @@ public PlanNode toPlanNode(RelNode node) {
     return result;
   }
 
+  private UnnestNode convertLogicalUncollect(Uncollect node) {
+    // Expect input provides a single array expression (typically a Project 
with one expression)
+    RexExpression arrayExpr = null;
+    RelNode input = node.getInput();
+    if (input instanceof Project) {
+      Project p = (Project) input;
+      if (p.getProjects().size() == 1) {
+        arrayExpr = RexExpressionUtils.fromRexNode(p.getProjects().get(0));
+      }
+    }
+    if (arrayExpr == null) {
+      // Fallback: refer to first input ref
+      arrayExpr = new RexExpression.InputRef(0);
+    }
+    String columnAlias = null;
+    if (!node.getRowType().getFieldList().isEmpty()) {
+      columnAlias = node.getRowType().getFieldList().get(0).getName();
+    }
+    boolean withOrdinality = false;
+    String ordinalityAlias = null;
+    // Calcite Uncollect exposes withOrdinality via field names if present; if 
>1 fields and last is ordinality
+    if (node.getRowType().getFieldList().size() > 1) {
+      withOrdinality = true;
+      ordinalityAlias = node.getRowType().getFieldList().get(1).getName();
+    }
+    return new UnnestNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), 
NodeHint.EMPTY,
+        convertInputs(node.getInputs()), arrayExpr, columnAlias, 
withOrdinality, ordinalityAlias);
+  }
+
+  private BasePlanNode convertLogicalCorrelate(LogicalCorrelate node) {
+    // Pattern: Correlate(left, Uncollect(Project(correlatedField)))
+    RelNode right = node.getRight();
+    RelDataType leftRowType = node.getLeft().getRowType();
+    Project correlatedProject = findProjectUnderUncollect(right);
+    RexExpression arrayExpr =
+        correlatedProject != null ? deriveArrayExpression(correlatedProject, 
leftRowType) : null;
+    if (arrayExpr == null) {
+      arrayExpr = new RexExpression.InputRef(0);
+    }
+    LogicalFilter correlateFilter = findCorrelateFilter(right);
+    boolean wrapWithFilter = correlateFilter != null;
+    RexNode filterCondition = wrapWithFilter ? correlateFilter.getCondition() 
: null;
+    // Use the entire correlate output schema
+    PlanNode inputNode = toPlanNode(node.getLeft());
+    // Ensure inputs list is mutable because downstream visitors (e.g., 
withInputs methods) may modify the inputs list
+    List<PlanNode> inputs = new ArrayList<>(List.of(inputNode));

Review Comment:
   [nitpick] The comment on line 216 explains that the list must be mutable for 
downstream visitors, but the code creates a mutable list wrapping an immutable 
single-element list. This is unnecessarily complex—consider using `new 
ArrayList<>(Collections.singletonList(inputNode))` or simply `new 
ArrayList<>(); inputs.add(inputNode);` for clarity.
   ```suggestion
       List<PlanNode> inputs = new ArrayList<>();
       inputs.add(inputNode);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to