This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 630f94da4c [multistage][bugfix] allow having clause in leaf-stage 
return extra columns (#9947)
630f94da4c is described below

commit 630f94da4c333403ce9449f9783f550b39982787
Author: Rong Rong <ro...@apache.org>
AuthorDate: Thu Dec 8 15:47:03 2022 -0800

    [multistage][bugfix] allow having clause in leaf-stage return extra columns 
(#9947)
    
    * allow having clause in leaf-stage return extra columns
    * fix error message
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../pinot/common/datablock/DataBlockUtils.java     |  2 +-
 .../pinot/common/exception/QueryException.java     |  1 +
 .../apache/pinot/query/runtime/QueryRunner.java    |  2 +-
 .../LeafStageTransferableBlockOperator.java        | 32 +++++++---------------
 .../LeafStageTransferableBlockOperatorTest.java    | 22 ++++++++++++++-
 .../src/test/resources/queries/Aggregates.json     |  4 ++-
 6 files changed, 37 insertions(+), 26 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
index 963d49ab74..8b41969e8e 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
@@ -50,7 +50,7 @@ public final class DataBlockUtils {
     while (t.getMessage() == null) {
       t = t.getCause();
     }
-    return QueryException.getTruncatedStackTrace(t);
+    return t.getMessage() + "\n" + QueryException.getTruncatedStackTrace(t);
   }
 
   public static MetadataBlock getErrorDataBlock(Map<Integer, String> 
exceptions) {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
index a4176f65fe..5b2910a300 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
@@ -174,6 +174,7 @@ public class QueryException {
     return copiedProcessingException;
   }
 
+  // TODO: getTruncatedStackTrace(Throwable) always precede by t.getMessage();
   public static String getTruncatedStackTrace(Throwable t) {
     StringWriter stringWriter = new StringWriter();
     t.printStackTrace(new PrintWriter(stringWriter));
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 968fbb7f7b..54e733e27b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -202,7 +202,7 @@ public class QueryRunner {
     } catch (Exception e) {
       InstanceResponseBlock errorResponse = new InstanceResponseBlock();
       
errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE,
-          QueryException.getTruncatedStackTrace(e));
+          e.getMessage() + QueryException.getTruncatedStackTrace(e));
       return errorResponse;
     }
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 4b24b62f3a..41f8b4535e 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -25,7 +25,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.PriorityQueue;
-import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.DataBlockUtils;
@@ -40,7 +39,6 @@ import 
org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
-import 
org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 
@@ -148,12 +146,10 @@ public class LeafStageTransferableBlockOperator extends 
BaseOperator<Transferabl
   @SuppressWarnings("ConstantConditions")
   private static TransferableBlock 
composeDistinctTransferableBlock(InstanceResponseBlock responseBlock,
       DataSchema desiredDataSchema) {
-    List<String> selectionColumns = Arrays.asList(
-        ((DistinctAggregationFunction) 
responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns());
-    int[] columnIndices = 
SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema);
-    Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct 
table schema for leaf stage."
-        + " Expected: " + desiredDataSchema + ". Actual Columns: " + 
selectionColumns
-        + " Column Ordering: " + Arrays.toString(columnIndices));
+    DataSchema resultSchema = responseBlock.getDataSchema();
+    
Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+        resultSchema.getColumnDataTypes()), "Incompatible selection result 
data schema: "
+        + " Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
     return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
   }
 
@@ -168,13 +164,9 @@ public class LeafStageTransferableBlockOperator extends 
BaseOperator<Transferabl
   private static TransferableBlock 
composeGroupByTransferableBlock(InstanceResponseBlock responseBlock,
       DataSchema desiredDataSchema) {
     DataSchema resultSchema = responseBlock.getDataSchema();
-    // GROUP-BY column names conforms with selection expression
-    List<String> selectionColumns = 
responseBlock.getQueryContext().getSelectExpressions().stream()
-        .map(e -> e.toString()).collect(Collectors.toList());
-    int[] columnIndices = 
SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
-    Preconditions.checkState(inOrder(columnIndices), "Incompatible group by 
result schema for leaf stage."
-        + " Expected: " + desiredDataSchema + ". Actual: " + resultSchema
-        + " Column Ordering: " + Arrays.toString(columnIndices));
+    
Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+        resultSchema.getColumnDataTypes()), "Incompatible selection result 
data schema: "
+        + " Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
     return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
   }
 
@@ -190,13 +182,9 @@ public class LeafStageTransferableBlockOperator extends 
BaseOperator<Transferabl
   private static TransferableBlock 
composeAggregationTransferableBlock(InstanceResponseBlock responseBlock,
       DataSchema desiredDataSchema) {
     DataSchema resultSchema = responseBlock.getDataSchema();
-    // AGG-ONLY column names are derived from AggFunction.getColumnName()
-    List<String> selectionColumns = 
Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map(
-        a -> a.getColumnName()).collect(Collectors.toList());
-    int[] columnIndices = 
SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
-    Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate 
result schema for leaf stage."
-        + " Expected: " + desiredDataSchema + ". Actual: " + resultSchema
-        + " Column Ordering: " + Arrays.toString(columnIndices));
+    
Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+        resultSchema.getColumnDataTypes()), "Incompatible selection result 
data schema: "
+        + " Expected: " + desiredDataSchema + ". Actual: " + resultSchema);
     return composeDirectTransferableBlock(responseBlock, desiredDataSchema);
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
index 60ea7c722b..587b8c5247 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
@@ -208,7 +208,7 @@ public class LeafStageTransferableBlockOperatorTest {
     // Given:
     QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
         "SELECT intCol, count(*), sum(doubleCol), strCol FROM tbl GROUP BY 
strCol, intCol");
-    // result schema doesn't match with DISTINCT columns using GROUP BY.
+    // result schema doesn't match with columns ordering using GROUP BY, this 
should not occur.
     DataSchema schema = new DataSchema(new String[]{"intCol", "count(*)", 
"sum(doubleCol)", "strCol"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.INT,
             DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.STRING});
@@ -223,6 +223,26 @@ public class LeafStageTransferableBlockOperatorTest {
     Assert.assertFalse(resultBlock.isErrorBlock());
   }
 
+  @Test
+  public void 
shouldNotErrorOutWhenQueryContextAskForGroupByOutOfOrderWithHaving() {
+    // Given:
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext("SELECT strCol, intCol, count(*), "
+        + "sum(doubleCol) FROM tbl GROUP BY strCol, intCol HAVING 
sum(doubleCol) < 10 AND count(*) > 0");
+    // result schema contains duplicate reference from agg and having. it will 
repeat itself.
+    DataSchema schema = new DataSchema(new String[]{"strCol", "intCol", 
"count(*)", "sum(doubleCol)", "sum(doubleCol)"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG, 
DataSchema.ColumnDataType.LONG});
+    List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
+        new InstanceResponseBlock(new GroupByResultsBlock(schema, 
Collections.emptyList()), queryContext));
+    LeafStageTransferableBlockOperator operator = new 
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+
+    // When:
+    TransferableBlock resultBlock = operator.nextBlock();
+
+    // Then:
+    Assert.assertFalse(resultBlock.isErrorBlock());
+  }
+
   @Test
   public void shouldNotErrorOutWhenDealingWithAggregationResults() {
     // Given:
diff --git a/pinot-query-runtime/src/test/resources/queries/Aggregates.json 
b/pinot-query-runtime/src/test/resources/queries/Aggregates.json
index 53b0fbf4f6..7e2f7e4e9c 100644
--- a/pinot-query-runtime/src/test/resources/queries/Aggregates.json
+++ b/pinot-query-runtime/src/test/resources/queries/Aggregates.json
@@ -255,7 +255,9 @@
       { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY 
upper(string_col) ORDER BY upper(string_col)" },
       { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY 
upper(string_col) ORDER BY count(int_col)" },
       { "sql": "SELECT * FROM (SELECT string_col, bool_col, min(int_col) AS m, 
count(int_col), count(*) AS c FROM {tbl} GROUP BY string_col, bool_col ORDER BY 
string_col) WHERE c < m" },
-      { "sql": "SELECT * FROM (SELECT string_col, bool_col, min(int_col) AS m, 
count(int_col), count(*) AS c FROM {tbl} GROUP BY string_col, bool_col ORDER BY 
bool_col, string_col) WHERE c < m" }
+      { "sql": "SELECT * FROM (SELECT string_col, bool_col, min(int_col) AS m, 
count(int_col), count(*) AS c FROM {tbl} GROUP BY string_col, bool_col ORDER BY 
bool_col, string_col) WHERE c < m" },
+      { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY 
upper(string_col) HAVING sum(int_col) > 0 ORDER BY upper(string_col)" },
+      { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY 
upper(string_col) HAVING sum(int_col) >= 0 AND count(int_col) >= 0 ORDER BY 
count(int_col)" }
     ]
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to