Copilot commented on code in PR #17168:
URL: https://github.com/apache/pinot/pull/17168#discussion_r2507320256


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java:
##########
@@ -159,6 +168,77 @@ public PlanNode toPlanNode(RelNode node) {
     return result;
   }
 
+  private UnnestNode convertLogicalUncollect(Uncollect node) {
+    // Expect input provides a single array expression (typically a Project 
with one expression)
+    RexExpression arrayExpr = null;
+    RelNode input = node.getInput();
+    if (input instanceof Project) {
+      Project p = (Project) input;
+      if (p.getProjects().size() == 1) {
+        arrayExpr = RexExpressionUtils.fromRexNode(p.getProjects().get(0));
+      }
+    }
+    if (arrayExpr == null) {
+      // Fallback: refer to first input ref
+      arrayExpr = new RexExpression.InputRef(0);
+    }
+    String columnAlias = null;
+    if (!node.getRowType().getFieldList().isEmpty()) {
+      columnAlias = node.getRowType().getFieldList().get(0).getName();
+    }
+    return new UnnestNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), 
NodeHint.EMPTY,
+        convertInputs(node.getInputs()), arrayExpr, columnAlias);
+  }
+
+  private UnnestNode convertLogicalCorrelate(LogicalCorrelate node) {
+    // Pattern: Correlate(left, Uncollect(Project(correlatedField)))
+    RexExpression arrayExpr = null;
+    RelNode right = node.getRight();
+    if (right instanceof Uncollect) {
+      RelNode rInput = ((Uncollect) right).getInput();
+      if (rInput instanceof Project) {
+        Project p = (Project) rInput;
+        if (p.getProjects().size() == 1) {
+          RexNode rex = p.getProjects().get(0);
+          Integer idx = resolveInputRefFromCorrel(rex, 
node.getLeft().getRowType());
+          if (idx != null) {
+            arrayExpr = new RexExpression.InputRef(idx);
+          } else {
+            arrayExpr = RexExpressionUtils.fromRexNode(rex);
+            if (!(arrayExpr instanceof RexExpression.InputRef)) {
+              arrayExpr = new RexExpression.InputRef(0);
+            }
+          }
+        }
+      }
+    }
+    if (arrayExpr == null) {
+      arrayExpr = new RexExpression.InputRef(0);
+    }
+    // Use the entire correlate output schema
+    PlanNode inputNode = toPlanNode(node.getLeft());
+    // Ensure inputs list is mutable for downstream visitors (e.g., 
PlanFragmenter.process uses replaceAll)

Review Comment:
   The comment mentions 'PlanFragmenter.process uses replaceAll', but this is 
misleading. The actual reason is that `withInputs` methods in various visitors 
expect to potentially modify the inputs list. Consider clarifying that 
downstream visitors may modify the inputs list, not specifically the 
`replaceAll` operation.
   ```suggestion
       // Ensure inputs list is mutable because downstream visitors (e.g., 
withInputs methods) may modify the inputs list
   ```



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/UnnestIntegrationTest.java:
##########
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.io.File;
+import java.util.List;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class UnnestIntegrationTest extends 
CustomDataQueryClusterIntegrationTest {
+
+  private static final String DEFAULT_TABLE_NAME = "UnnestIntegrationTest";
+  private static final String INT_COLUMN = "intCol";
+  private static final String LONG_COLUMN = "longCol";
+  private static final String FLOAT_COLUMN = "floatCol";
+  private static final String DOUBLE_COLUMN = "doubleCol";
+  private static final String STRING_COLUMN = "stringCol";
+  private static final String TIMESTAMP_COLUMN = "timestampCol";
+  private static final String GROUP_BY_COLUMN = "groupKey";
+  private static final String LONG_ARRAY_COLUMN = "longArrayCol";
+  private static final String DOUBLE_ARRAY_COLUMN = "doubleArrayCol";
+  private static final String STRING_ARRAY_COLUMN = "stringArrayCol";
+
+  @Test(dataProvider = "useV2QueryEngine")
+  public void testCountWithCrossJoinUnnest(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query = String.format("SELECT COUNT(*) FROM %s CROSS JOIN 
UNNEST(longArrayCol) AS u(elem)", getTableName());
+    JsonNode json = postQuery(query);
+    System.out.println("json = " + json);
+    JsonNode rows = json.get("resultTable").get("rows");
+    assertNotNull(rows);
+    long count = rows.get(0).get(0).asLong();
+    assertEquals(count, 4 * getCountStarResult());
+  }
+
+  @Test(dataProvider = "useV2QueryEngine")
+  public void testSelectWithCrossJoinUnnest(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query = String.format("SELECT intCol, u.elem FROM %s CROSS JOIN 
UNNEST(stringArrayCol) AS u(elem)"
+        + " ORDER BY intCol", getTableName());
+    JsonNode json = postQuery(query);
+    System.out.println("json = " + json);

Review Comment:
   Debug print statements should not be committed. Remove this 
`System.out.println` or replace it with proper logging using SLF4J if needed 
for debugging.
   ```suggestion
   
   ```



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java:
##########
@@ -159,6 +168,77 @@ public PlanNode toPlanNode(RelNode node) {
     return result;
   }
 
+  private UnnestNode convertLogicalUncollect(Uncollect node) {
+    // Expect input provides a single array expression (typically a Project 
with one expression)
+    RexExpression arrayExpr = null;
+    RelNode input = node.getInput();
+    if (input instanceof Project) {
+      Project p = (Project) input;
+      if (p.getProjects().size() == 1) {
+        arrayExpr = RexExpressionUtils.fromRexNode(p.getProjects().get(0));
+      }
+    }
+    if (arrayExpr == null) {
+      // Fallback: refer to first input ref
+      arrayExpr = new RexExpression.InputRef(0);
+    }
+    String columnAlias = null;
+    if (!node.getRowType().getFieldList().isEmpty()) {
+      columnAlias = node.getRowType().getFieldList().get(0).getName();
+    }
+    return new UnnestNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), 
NodeHint.EMPTY,
+        convertInputs(node.getInputs()), arrayExpr, columnAlias);
+  }
+
+  private UnnestNode convertLogicalCorrelate(LogicalCorrelate node) {
+    // Pattern: Correlate(left, Uncollect(Project(correlatedField)))
+    RexExpression arrayExpr = null;
+    RelNode right = node.getRight();
+    if (right instanceof Uncollect) {
+      RelNode rInput = ((Uncollect) right).getInput();
+      if (rInput instanceof Project) {
+        Project p = (Project) rInput;
+        if (p.getProjects().size() == 1) {
+          RexNode rex = p.getProjects().get(0);
+          Integer idx = resolveInputRefFromCorrel(rex, 
node.getLeft().getRowType());
+          if (idx != null) {
+            arrayExpr = new RexExpression.InputRef(idx);
+          } else {
+            arrayExpr = RexExpressionUtils.fromRexNode(rex);
+            if (!(arrayExpr instanceof RexExpression.InputRef)) {
+              arrayExpr = new RexExpression.InputRef(0);
+            }
+          }
+        }
+      }
+    }
+    if (arrayExpr == null) {
+      arrayExpr = new RexExpression.InputRef(0);
+    }
+    // Use the entire correlate output schema
+    PlanNode inputNode = toPlanNode(node.getLeft());
+    // Ensure inputs list is mutable for downstream visitors (e.g., 
PlanFragmenter.process uses replaceAll)
+    List<PlanNode> inputs = new ArrayList<>(List.of(inputNode));
+    return new UnnestNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), 
NodeHint.EMPTY,
+        inputs, arrayExpr, null);
+  }
+
+  private static Integer resolveInputRefFromCorrel(RexNode expr, RelDataType 
leftRowType) {
+    if (expr instanceof RexFieldAccess) {
+      RexFieldAccess fieldAccess = (RexFieldAccess) expr;
+      if (fieldAccess.getReferenceExpr() instanceof RexCorrelVariable) {
+        String fieldName = fieldAccess.getField().getName();
+        List<RelDataTypeField> fields = leftRowType.getFieldList();

Review Comment:
   Use case-insensitive comparison in a loop. Consider creating a map with 
case-insensitive keys if this method is called frequently, or document why 
case-insensitive matching is required here. If exact case matching is 
acceptable, use `.equals()` instead of `.equalsIgnoreCase()` for better 
performance.
   ```suggestion
           List<RelDataTypeField> fields = leftRowType.getFieldList();
           // SQL field names are case-insensitive, so we must use 
equalsIgnoreCase for correct matching.
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnnestOperator.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.plannode.UnnestNode;
+import org.apache.pinot.query.runtime.blocks.MseBlock;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
+import 
org.apache.pinot.query.runtime.operator.operands.TransformOperandFactory;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * UnnestOperator expands an array/collection value per input row into zero or 
more output rows.
+ * The output schema is provided by the associated UnnestNode's data schema.
+ */
+public class UnnestOperator extends MultiStageOperator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(UnnestOperator.class);
+  private static final String EXPLAIN_NAME = "UNNEST";
+
+  private final MultiStageOperator _input;
+  private final TransformOperand _arrayExprOperand;
+  private final DataSchema _resultSchema;
+  private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
+
+  public UnnestOperator(OpChainExecutionContext context, MultiStageOperator 
input, DataSchema inputSchema,
+      UnnestNode node) {
+    super(context);
+    _input = input;
+    _arrayExprOperand = 
TransformOperandFactory.getTransformOperand(node.getArrayExpr(), inputSchema);
+    _resultSchema = node.getDataSchema();
+  }
+
+  @Override
+  public void registerExecution(long time, int numRows, long memoryUsedBytes, 
long gcTimeMs) {
+    _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
+    _statMap.merge(StatKey.EMITTED_ROWS, numRows);
+    _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+    _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
+  }
+
+  @Override
+  protected Logger logger() {
+    return LOGGER;
+  }
+
+  @Override
+  public List<MultiStageOperator> getChildOperators() {
+    return List.of(_input);
+  }
+
+  @Override
+  public Type getOperatorType() {
+    return Type.UNNEST;
+  }
+
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected MseBlock getNextBlock() {
+    MseBlock block = _input.nextBlock();
+    if (block.isEos()) {
+      return block;
+    }
+    MseBlock.Data dataBlock = (MseBlock.Data) block;
+    List<Object[]> inputRows = dataBlock.asRowHeap().getRows();
+    List<Object[]> outRows = new ArrayList<>();
+
+    for (Object[] row : inputRows) {
+      Object value = _arrayExprOperand.apply(row);
+      if (value == null) {
+        continue;
+      }
+      if (value instanceof List) {
+        for (Object element : (List<?>) value) {
+          outRows.add(appendElement(row, element));
+        }
+      } else if (value.getClass().isArray()) {
+        int length = java.lang.reflect.Array.getLength(value);
+        for (int i = 0; i < length; i++) {
+          Object element = java.lang.reflect.Array.get(value, i);
+          outRows.add(appendElement(row, element));
+        }
+      } else {
+        // If not array-like, treat as a single element
+        outRows.add(appendElement(row, value));

Review Comment:
   The fallback behavior of treating non-array values as single elements is 
undocumented and potentially unexpected. Add Javadoc to the `getNextBlock()` 
method explaining this behavior, or consider throwing an error for unsupported 
types to avoid silent incorrect behavior.



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/UnnestIntegrationTest.java:
##########
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.io.File;
+import java.util.List;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class UnnestIntegrationTest extends 
CustomDataQueryClusterIntegrationTest {
+
+  private static final String DEFAULT_TABLE_NAME = "UnnestIntegrationTest";
+  private static final String INT_COLUMN = "intCol";
+  private static final String LONG_COLUMN = "longCol";
+  private static final String FLOAT_COLUMN = "floatCol";
+  private static final String DOUBLE_COLUMN = "doubleCol";
+  private static final String STRING_COLUMN = "stringCol";
+  private static final String TIMESTAMP_COLUMN = "timestampCol";
+  private static final String GROUP_BY_COLUMN = "groupKey";
+  private static final String LONG_ARRAY_COLUMN = "longArrayCol";
+  private static final String DOUBLE_ARRAY_COLUMN = "doubleArrayCol";
+  private static final String STRING_ARRAY_COLUMN = "stringArrayCol";
+
+  @Test(dataProvider = "useV2QueryEngine")
+  public void testCountWithCrossJoinUnnest(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query = String.format("SELECT COUNT(*) FROM %s CROSS JOIN 
UNNEST(longArrayCol) AS u(elem)", getTableName());
+    JsonNode json = postQuery(query);
+    System.out.println("json = " + json);

Review Comment:
   Debug print statements should not be committed. Remove this 
`System.out.println` or replace it with proper logging using SLF4J if needed 
for debugging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to