Copilot commented on code in PR #17168:
URL: https://github.com/apache/pinot/pull/17168#discussion_r2557769736
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java:
##########
@@ -159,6 +169,257 @@ public PlanNode toPlanNode(RelNode node) {
return result;
}
+ private UnnestNode convertLogicalUncollect(Uncollect node) {
+ // Expect input provides a single array expression (typically a Project
with one expression)
+ RexExpression arrayExpr = null;
+ RelNode input = node.getInput();
+ if (input instanceof Project) {
+ Project p = (Project) input;
+ if (p.getProjects().size() == 1) {
+ arrayExpr = RexExpressionUtils.fromRexNode(p.getProjects().get(0));
+ }
+ }
+ if (arrayExpr == null) {
+ // Fallback: refer to first input ref
+ arrayExpr = new RexExpression.InputRef(0);
+ }
+ String columnAlias = null;
+ if (!node.getRowType().getFieldList().isEmpty()) {
+ columnAlias = node.getRowType().getFieldList().get(0).getName();
+ }
+ boolean withOrdinality = false;
+ String ordinalityAlias = null;
+ // Calcite Uncollect exposes withOrdinality via field names if present; if
>1 fields and last is ordinality
+ if (node.getRowType().getFieldList().size() > 1) {
+ withOrdinality = true;
+ ordinalityAlias = node.getRowType().getFieldList().get(1).getName();
+ }
+ return new UnnestNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()),
NodeHint.EMPTY,
+ convertInputs(node.getInputs()), arrayExpr, columnAlias,
withOrdinality, ordinalityAlias);
+ }
+
+ private BasePlanNode convertLogicalCorrelate(LogicalCorrelate node) {
+ // Pattern: Correlate(left, Uncollect(Project(correlatedField)))
+ RelNode right = node.getRight();
+ RelDataType leftRowType = node.getLeft().getRowType();
+ Project correlatedProject = findProjectUnderUncollect(right);
+ RexExpression arrayExpr =
+ correlatedProject != null ? deriveArrayExpression(correlatedProject,
leftRowType) : null;
+ if (arrayExpr == null) {
+ arrayExpr = new RexExpression.InputRef(0);
+ }
+ LogicalFilter correlateFilter = findCorrelateFilter(right);
+ boolean wrapWithFilter = correlateFilter != null;
+ RexNode filterCondition = wrapWithFilter ? correlateFilter.getCondition()
: null;
+ // Use the entire correlate output schema
+ PlanNode inputNode = toPlanNode(node.getLeft());
+ // Ensure inputs list is mutable because downstream visitors (e.g.,
withInputs methods) may modify the inputs list
+ List<PlanNode> inputs = new ArrayList<>();
+ inputs.add(inputNode);
+ ElementOrdinalInfo ordinalInfo = deriveElementOrdinalInfo(right,
leftRowType);
+ boolean withOrdinality = ordinalInfo.hasOrdinality();
+ String elementAlias = ordinalInfo.getElementAlias();
+ String ordinalityAlias = ordinalInfo.getOrdinalityAlias();
+ int elementIndex = ordinalInfo.getElementIndex();
+ int ordinalityIndex = ordinalInfo.getOrdinalityIndex();
+ UnnestNode unnest = new UnnestNode(DEFAULT_STAGE_ID,
toDataSchema(node.getRowType()), NodeHint.EMPTY,
+ inputs, arrayExpr, elementAlias, withOrdinality, ordinalityAlias,
elementIndex, ordinalityIndex);
+ if (wrapWithFilter) {
+ // Wrap Unnest with a FilterNode; rewrite filter InputRefs
(0:elem,1:idx) to absolute output indexes
+ RexExpression rewritten =
+ rewriteInputRefs(RexExpressionUtils.fromRexNode(filterCondition),
elementIndex, ordinalityIndex);
+ return new FilterNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()),
NodeHint.EMPTY,
+ new ArrayList<>(List.of(unnest)), rewritten);
+ }
+ return unnest;
+ }
+
+ @Nullable
+ private static Project findProjectUnderUncollect(RelNode node) {
+ RelNode current = node;
+ while (current != null) {
+ if (current instanceof Uncollect) {
+ RelNode input = ((Uncollect) current).getInput();
+ return input instanceof Project ? (Project) input : null;
+ }
+ if (current instanceof Project) {
+ current = ((Project) current).getInput();
+ } else if (current instanceof LogicalFilter) {
+ current = ((LogicalFilter) current).getInput();
+ } else {
+ return null;
+ }
+ }
+ return null;
+ }
+
+ @Nullable
+ private static RexExpression deriveArrayExpression(Project project,
RelDataType leftRowType) {
+ if (project.getProjects().size() != 1) {
+ return null;
+ }
+ RexNode rex = project.getProjects().get(0);
+ Integer idx = resolveInputRefFromCorrel(rex, leftRowType);
+ if (idx != null) {
+ return new RexExpression.InputRef(idx);
+ }
+ RexExpression candidate = RexExpressionUtils.fromRexNode(rex);
+ return candidate instanceof RexExpression.InputRef ? candidate : new
RexExpression.InputRef(0);
+ }
+
+ @Nullable
+ private static LogicalFilter findCorrelateFilter(RelNode node) {
+ RelNode current = node;
+ while (current instanceof Project || current instanceof LogicalFilter) {
+ if (current instanceof LogicalFilter) {
+ return (LogicalFilter) current;
+ }
+ current = ((Project) current).getInput();
+ }
+ return null;
+ }
+
+ private static ElementOrdinalInfo deriveElementOrdinalInfo(RelNode right,
RelDataType leftRowType) {
+ ElementOrdinalAccumulator accumulator = new
ElementOrdinalAccumulator(leftRowType.getFieldCount());
+ if (right instanceof Uncollect) {
+ accumulator.populateFromRowType(right.getRowType());
+ } else if (right instanceof Project) {
+ accumulator.populateFromProject((Project) right);
+ } else if (right instanceof LogicalFilter) {
+ LogicalFilter filter = (LogicalFilter) right;
+ RelNode filterInput = filter.getInput();
+ if (filterInput instanceof Uncollect) {
+ accumulator.populateFromRowType(filter.getRowType());
+ } else if (filterInput instanceof Project) {
+ accumulator.populateFromProject((Project) filterInput);
+ }
+ }
+ return accumulator.toInfo();
+ }
+
+ private static final class ElementOrdinalAccumulator {
+ private final int _base;
+ private String _elementAlias;
+ private String _ordinalityAlias;
+ private int _elementIndex = -1;
+ private int _ordinalityIndex = -1;
+
+ ElementOrdinalAccumulator(int base) {
+ _base = base;
+ }
+
+ void populateFromRowType(RelDataType rowType) {
+ List<RelDataTypeField> fields = rowType.getFieldList();
+ if (!fields.isEmpty() && _elementIndex < 0) {
+ _elementAlias = fields.get(0).getName();
+ _elementIndex = _base;
+ }
+ if (fields.size() > 1 && _ordinalityIndex < 0) {
+ _ordinalityAlias = fields.get(1).getName();
+ _ordinalityIndex = _base + 1;
+ }
+ }
+
+ void populateFromProject(Project project) {
+ List<RexNode> projects = project.getProjects();
+ List<RelDataTypeField> projFields = project.getRowType().getFieldList();
+ for (int j = 0; j < projects.size(); j++) {
+ RexNode pj = projects.get(j);
+ if (pj instanceof RexInputRef) {
+ int idx = ((RexInputRef) pj).getIndex();
+ String outName = projFields.get(j).getName();
+ if (idx == 0 && _elementIndex < 0) {
+ _elementIndex = _base + j;
+ _elementAlias = outName;
+ } else if (idx == 1 && _ordinalityIndex < 0) {
+ _ordinalityIndex = _base + j;
+ _ordinalityAlias = outName;
+ }
+ }
+ }
+ }
+
+ ElementOrdinalInfo toInfo() {
+ return new ElementOrdinalInfo(_elementAlias, _ordinalityAlias,
_elementIndex, _ordinalityIndex);
+ }
+ }
+
+ private static final class ElementOrdinalInfo {
+ private final String _elementAlias;
+ private final String _ordinalityAlias;
+ private final int _elementIndex;
+ private final int _ordinalityIndex;
+
+ ElementOrdinalInfo(String elementAlias, String ordinalityAlias, int
elementIndex, int ordinalityIndex) {
+ _elementAlias = elementAlias;
+ _ordinalityAlias = ordinalityAlias;
+ _elementIndex = elementIndex;
+ _ordinalityIndex = ordinalityIndex;
+ }
+
+ String getElementAlias() {
+ return _elementAlias;
+ }
+
+ String getOrdinalityAlias() {
+ return _ordinalityAlias;
+ }
+
+ int getElementIndex() {
+ return _elementIndex;
+ }
+
+ int getOrdinalityIndex() {
+ return _ordinalityIndex;
+ }
+
+ boolean hasOrdinality() {
+ return _ordinalityIndex >= 0;
+ }
+ }
+
+ private static RexExpression rewriteInputRefs(RexExpression expr, int
elemOutIdx, int ordOutIdx) {
+ if (expr instanceof RexExpression.InputRef) {
+ int idx = ((RexExpression.InputRef) expr).getIndex();
+ if (idx == 0 && elemOutIdx >= 0) {
+ return new RexExpression.InputRef(elemOutIdx);
+ } else if (idx == 1 && ordOutIdx >= 0) {
+ return new RexExpression.InputRef(ordOutIdx);
+ } else {
+ return expr;
+ }
+ } else if (expr instanceof RexExpression.FunctionCall) {
+ RexExpression.FunctionCall fc = (RexExpression.FunctionCall) expr;
+ List<RexExpression> ops = fc.getFunctionOperands();
+ List<RexExpression> rewritten = new ArrayList<>(ops.size());
+ for (RexExpression op : ops) {
+ rewritten.add(rewriteInputRefs(op, elemOutIdx, ordOutIdx));
+ }
+ return new RexExpression.FunctionCall(fc.getDataType(),
fc.getFunctionName(), rewritten);
+ } else {
+ return expr;
+ }
+ }
+
+ private static Integer resolveInputRefFromCorrel(RexNode expr, RelDataType
leftRowType) {
+ if (expr instanceof RexFieldAccess) {
+ RexFieldAccess fieldAccess = (RexFieldAccess) expr;
+ if (fieldAccess.getReferenceExpr() instanceof RexCorrelVariable) {
+ String fieldName = fieldAccess.getField().getName();
+ List<RelDataTypeField> fields = leftRowType.getFieldList();
+ // SQL field names are case-insensitive, so we must use
equalsIgnoreCase for correct matching.
+ // This ensures that queries referencing fields with different cases
(e.g., "UserId" vs "userid") are handled
+ // correctly.
Review Comment:
While the comment explains the use of case-insensitive comparison, it would
be more complete to note that this assumes Calcite's default case-sensitivity
settings. If the schema uses a case-sensitive configuration, this approach
might produce incorrect results. Consider documenting this assumption or
checking the schema's case sensitivity setting.
```suggestion
// SQL field names are case-insensitive by default in Calcite, so we
use equalsIgnoreCase for matching.
// NOTE: This assumes that the schema is configured with Calcite's
default case-insensitivity.
// If the schema is case-sensitive, this approach may produce
incorrect results. Update logic if needed.
```
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnnestOperator.java:
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.plannode.UnnestNode;
+import org.apache.pinot.query.runtime.blocks.MseBlock;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
+import
org.apache.pinot.query.runtime.operator.operands.TransformOperandFactory;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * UnnestOperator expands an array/collection value per input row into zero or
more output rows.
+ * The output schema is provided by the associated UnnestNode's data schema.
+ */
+public class UnnestOperator extends MultiStageOperator {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(UnnestOperator.class);
+ private static final String EXPLAIN_NAME = "UNNEST";
+
+ private final MultiStageOperator _input;
+ private final TransformOperand _arrayExprOperand;
+ private final DataSchema _resultSchema;
+ private final boolean _withOrdinality;
+ private final int _elementIndex;
+ private final int _ordinalityIndex;
+ private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
+
+ public UnnestOperator(OpChainExecutionContext context, MultiStageOperator
input, DataSchema inputSchema,
+ UnnestNode node) {
+ super(context);
+ _input = input;
+ _arrayExprOperand =
TransformOperandFactory.getTransformOperand(node.getArrayExpr(), inputSchema);
+ _resultSchema = node.getDataSchema();
+ _withOrdinality = node.isWithOrdinality();
+ _elementIndex = node.getElementIndex();
+ _ordinalityIndex = node.getOrdinalityIndex();
+ }
+
+ @Override
+ public void registerExecution(long time, int numRows, long memoryUsedBytes,
long gcTimeMs) {
+ _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
+ _statMap.merge(StatKey.EMITTED_ROWS, numRows);
+ _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+ _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
+ }
+
+ @Override
+ protected Logger logger() {
+ return LOGGER;
+ }
+
+ @Override
+ public List<MultiStageOperator> getChildOperators() {
+ return List.of(_input);
+ }
+
+ @Override
+ public Type getOperatorType() {
+ return Type.UNNEST;
+ }
+
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ protected MseBlock getNextBlock() {
+ MseBlock block = _input.nextBlock();
+ if (block.isEos()) {
+ return block;
+ }
+ MseBlock.Data dataBlock = (MseBlock.Data) block;
+ List<Object[]> inputRows = dataBlock.asRowHeap().getRows();
+ List<Object[]> outRows = new ArrayList<>();
+
+ for (Object[] row : inputRows) {
+ Object value = _arrayExprOperand.apply(row);
+ int ord = 1;
+ if (value == null) {
+ continue;
+ }
+ if (value instanceof List) {
+ for (Object element : (List<?>) value) {
+ outRows.add(appendElement(row, element, ord++));
+ }
+ } else if (value.getClass().isArray()) {
+ int length = java.lang.reflect.Array.getLength(value);
+ if (value instanceof int[]) {
+ int[] arr = (int[]) value;
+ for (int i = 0; i < length; i++) {
+ outRows.add(appendElement(row, arr[i], ord++));
+ }
+ } else if (value instanceof long[]) {
+ long[] arr = (long[]) value;
+ for (int i = 0; i < length; i++) {
+ outRows.add(appendElement(row, arr[i], ord++));
+ }
+ } else if (value instanceof double[]) {
+ double[] arr = (double[]) value;
+ for (int i = 0; i < length; i++) {
+ outRows.add(appendElement(row, arr[i], ord++));
+ }
+ } else if (value instanceof boolean[]) {
+ boolean[] arr = (boolean[]) value;
+ for (int i = 0; i < length; i++) {
+ outRows.add(appendElement(row, arr[i], ord++));
+ }
+ } else if (value instanceof char[]) {
+ char[] arr = (char[]) value;
+ for (int i = 0; i < length; i++) {
+ outRows.add(appendElement(row, arr[i], ord++));
+ }
+ } else if (value instanceof short[]) {
+ short[] arr = (short[]) value;
+ for (int i = 0; i < length; i++) {
+ outRows.add(appendElement(row, arr[i], ord++));
+ }
+ } else if (value instanceof byte[]) {
+ byte[] arr = (byte[]) value;
+ for (int i = 0; i < length; i++) {
+ outRows.add(appendElement(row, arr[i], ord++));
+ }
+ } else if (value instanceof String[]) {
+ String[] arr = (String[]) value;
+ for (int i = 0; i < length; i++) {
+ outRows.add(appendElement(row, arr[i], ord++));
+ }
+ } else if (value instanceof Object[]) {
+ Object[] arr = (Object[]) value;
+ for (int i = 0; i < length; i++) {
+ outRows.add(appendElement(row, arr[i], ord++));
+ }
+ } else {
Review Comment:
The code computes `length` using reflection but then uses it redundantly.
For primitive array types (int[], long[], etc.), the length can be obtained
directly from the array (e.g., `arr.length`) which is more efficient than
reflection. Consider moving the length calculation inside each branch or using
direct array length access for known primitive types.
```suggestion
if (value instanceof int[]) {
int[] arr = (int[]) value;
for (int i = 0; i < arr.length; i++) {
outRows.add(appendElement(row, arr[i], ord++));
}
} else if (value instanceof long[]) {
long[] arr = (long[]) value;
for (int i = 0; i < arr.length; i++) {
outRows.add(appendElement(row, arr[i], ord++));
}
} else if (value instanceof double[]) {
double[] arr = (double[]) value;
for (int i = 0; i < arr.length; i++) {
outRows.add(appendElement(row, arr[i], ord++));
}
} else if (value instanceof boolean[]) {
boolean[] arr = (boolean[]) value;
for (int i = 0; i < arr.length; i++) {
outRows.add(appendElement(row, arr[i], ord++));
}
} else if (value instanceof char[]) {
char[] arr = (char[]) value;
for (int i = 0; i < arr.length; i++) {
outRows.add(appendElement(row, arr[i], ord++));
}
} else if (value instanceof short[]) {
short[] arr = (short[]) value;
for (int i = 0; i < arr.length; i++) {
outRows.add(appendElement(row, arr[i], ord++));
}
} else if (value instanceof byte[]) {
byte[] arr = (byte[]) value;
for (int i = 0; i < arr.length; i++) {
outRows.add(appendElement(row, arr[i], ord++));
}
} else if (value instanceof String[]) {
String[] arr = (String[]) value;
for (int i = 0; i < arr.length; i++) {
outRows.add(appendElement(row, arr[i], ord++));
}
} else if (value instanceof Object[]) {
Object[] arr = (Object[]) value;
for (int i = 0; i < arr.length; i++) {
outRows.add(appendElement(row, arr[i], ord++));
}
} else {
int length = java.lang.reflect.Array.getLength(value);
```
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java:
##########
@@ -219,6 +222,23 @@ private static ExplainedNode
deserializeExplainedNode(Plan.PlanNode protoNode) {
extractInputs(protoNode), protoExplainNode.getTitle(),
protoExplainNode.getAttributesMap());
}
+ private static UnnestNode deserializeUnnestNode(Plan.PlanNode protoNode) {
+ Plan.UnnestNode protoUnnestNode = protoNode.getUnnestNode();
+ String alias = protoUnnestNode.getColumnAlias();
+ if (alias != null && alias.isEmpty()) {
+ alias = null;
+ }
+ String ordAlias = protoUnnestNode.getOrdinalityAlias();
+ if (ordAlias != null && ordAlias.isEmpty()) {
+ ordAlias = null;
+ }
+ int elemIdx = protoUnnestNode.hasElementIndex() ?
protoUnnestNode.getElementIndex() : -1;
+ int ordIdx = protoUnnestNode.hasOrdinalityIndex() ?
protoUnnestNode.getOrdinalityIndex() : -1;
Review Comment:
The deserialization logic uses -1 as a sentinel value for missing indices,
but this is not documented in the UnnestNode class. Consider adding a constant
(e.g., `UNSPECIFIED_INDEX = -1`) to UnnestNode to make this convention explicit
and avoid magic numbers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]