This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 630f94da4c [multistage][bugfix] allow having clause in leaf-stage return extra columns (#9947) 630f94da4c is described below commit 630f94da4c333403ce9449f9783f550b39982787 Author: Rong Rong <ro...@apache.org> AuthorDate: Thu Dec 8 15:47:03 2022 -0800 [multistage][bugfix] allow having clause in leaf-stage return extra columns (#9947) * allow having clause in leaf-stage return extra columns * fix error message Co-authored-by: Rong Rong <ro...@startree.ai> --- .../pinot/common/datablock/DataBlockUtils.java | 2 +- .../pinot/common/exception/QueryException.java | 1 + .../apache/pinot/query/runtime/QueryRunner.java | 2 +- .../LeafStageTransferableBlockOperator.java | 32 +++++++--------------- .../LeafStageTransferableBlockOperatorTest.java | 22 ++++++++++++++- .../src/test/resources/queries/Aggregates.json | 4 ++- 6 files changed, 37 insertions(+), 26 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java index 963d49ab74..8b41969e8e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java @@ -50,7 +50,7 @@ public final class DataBlockUtils { while (t.getMessage() == null) { t = t.getCause(); } - return QueryException.getTruncatedStackTrace(t); + return t.getMessage() + "\n" + QueryException.getTruncatedStackTrace(t); } public static MetadataBlock getErrorDataBlock(Map<Integer, String> exceptions) { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java index a4176f65fe..5b2910a300 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java @@ -174,6 +174,7 @@ public class QueryException { return copiedProcessingException; } + // TODO: getTruncatedStackTrace(Throwable) always precede by t.getMessage(); public static String getTruncatedStackTrace(Throwable t) { StringWriter stringWriter = new StringWriter(); t.printStackTrace(new PrintWriter(stringWriter)); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 968fbb7f7b..54e733e27b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -202,7 +202,7 @@ public class QueryRunner { } catch (Exception e) { InstanceResponseBlock errorResponse = new InstanceResponseBlock(); errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE, - QueryException.getTruncatedStackTrace(e)); + e.getMessage() + QueryException.getTruncatedStackTrace(e)); return errorResponse; } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java index 4b24b62f3a..41f8b4535e 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java @@ -25,7 +25,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.PriorityQueue; -import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.datablock.DataBlockUtils; @@ -40,7 +39,6 @@ import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock; import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; -import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction; import org.apache.pinot.core.query.selection.SelectionOperatorUtils; import org.apache.pinot.query.runtime.blocks.TransferableBlock; @@ -148,12 +146,10 @@ public class LeafStageTransferableBlockOperator extends BaseOperator<Transferabl @SuppressWarnings("ConstantConditions") private static TransferableBlock composeDistinctTransferableBlock(InstanceResponseBlock responseBlock, DataSchema desiredDataSchema) { - List<String> selectionColumns = Arrays.asList( - ((DistinctAggregationFunction) responseBlock.getQueryContext().getAggregationFunctions()[0]).getColumns()); - int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, desiredDataSchema); - Preconditions.checkState(inOrder(columnIndices), "Incompatible distinct table schema for leaf stage." - + " Expected: " + desiredDataSchema + ". Actual Columns: " + selectionColumns - + " Column Ordering: " + Arrays.toString(columnIndices)); + DataSchema resultSchema = responseBlock.getDataSchema(); + Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(), + resultSchema.getColumnDataTypes()), "Incompatible selection result data schema: " + + " Expected: " + desiredDataSchema + ". Actual: " + resultSchema); return composeDirectTransferableBlock(responseBlock, desiredDataSchema); } @@ -168,13 +164,9 @@ public class LeafStageTransferableBlockOperator extends BaseOperator<Transferabl private static TransferableBlock composeGroupByTransferableBlock(InstanceResponseBlock responseBlock, DataSchema desiredDataSchema) { DataSchema resultSchema = responseBlock.getDataSchema(); - // GROUP-BY column names conforms with selection expression - List<String> selectionColumns = responseBlock.getQueryContext().getSelectExpressions().stream() - .map(e -> e.toString()).collect(Collectors.toList()); - int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema); - Preconditions.checkState(inOrder(columnIndices), "Incompatible group by result schema for leaf stage." - + " Expected: " + desiredDataSchema + ". Actual: " + resultSchema - + " Column Ordering: " + Arrays.toString(columnIndices)); + Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(), + resultSchema.getColumnDataTypes()), "Incompatible selection result data schema: " + + " Expected: " + desiredDataSchema + ". Actual: " + resultSchema); return composeDirectTransferableBlock(responseBlock, desiredDataSchema); } @@ -190,13 +182,9 @@ public class LeafStageTransferableBlockOperator extends BaseOperator<Transferabl private static TransferableBlock composeAggregationTransferableBlock(InstanceResponseBlock responseBlock, DataSchema desiredDataSchema) { DataSchema resultSchema = responseBlock.getDataSchema(); - // AGG-ONLY column names are derived from AggFunction.getColumnName() - List<String> selectionColumns = Arrays.stream(responseBlock.getQueryContext().getAggregationFunctions()).map( - a -> a.getColumnName()).collect(Collectors.toList()); - int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema); - Preconditions.checkState(inOrder(columnIndices), "Incompatible aggregate result schema for leaf stage." - + " Expected: " + desiredDataSchema + ". Actual: " + resultSchema - + " Column Ordering: " + Arrays.toString(columnIndices)); + Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(), + resultSchema.getColumnDataTypes()), "Incompatible selection result data schema: " + + " Expected: " + desiredDataSchema + ". Actual: " + resultSchema); return composeDirectTransferableBlock(responseBlock, desiredDataSchema); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java index 60ea7c722b..587b8c5247 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java @@ -208,7 +208,7 @@ public class LeafStageTransferableBlockOperatorTest { // Given: QueryContext queryContext = QueryContextConverterUtils.getQueryContext( "SELECT intCol, count(*), sum(doubleCol), strCol FROM tbl GROUP BY strCol, intCol"); - // result schema doesn't match with DISTINCT columns using GROUP BY. + // result schema doesn't match with columns ordering using GROUP BY, this should not occur. DataSchema schema = new DataSchema(new String[]{"intCol", "count(*)", "sum(doubleCol)", "strCol"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.STRING}); @@ -223,6 +223,26 @@ public class LeafStageTransferableBlockOperatorTest { Assert.assertFalse(resultBlock.isErrorBlock()); } + @Test + public void shouldNotErrorOutWhenQueryContextAskForGroupByOutOfOrderWithHaving() { + // Given: + QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT strCol, intCol, count(*), " + + "sum(doubleCol) FROM tbl GROUP BY strCol, intCol HAVING sum(doubleCol) < 10 AND count(*) > 0"); + // result schema contains duplicate reference from agg and having. it will repeat itself. + DataSchema schema = new DataSchema(new String[]{"strCol", "intCol", "count(*)", "sum(doubleCol)", "sum(doubleCol)"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG}); + List<InstanceResponseBlock> resultsBlockList = Collections.singletonList( + new InstanceResponseBlock(new GroupByResultsBlock(schema, Collections.emptyList()), queryContext)); + LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(resultsBlockList, schema); + + // When: + TransferableBlock resultBlock = operator.nextBlock(); + + // Then: + Assert.assertFalse(resultBlock.isErrorBlock()); + } + @Test public void shouldNotErrorOutWhenDealingWithAggregationResults() { // Given: diff --git a/pinot-query-runtime/src/test/resources/queries/Aggregates.json b/pinot-query-runtime/src/test/resources/queries/Aggregates.json index 53b0fbf4f6..7e2f7e4e9c 100644 --- a/pinot-query-runtime/src/test/resources/queries/Aggregates.json +++ b/pinot-query-runtime/src/test/resources/queries/Aggregates.json @@ -255,7 +255,9 @@ { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) ORDER BY upper(string_col)" }, { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) ORDER BY count(int_col)" }, { "sql": "SELECT * FROM (SELECT string_col, bool_col, min(int_col) AS m, count(int_col), count(*) AS c FROM {tbl} GROUP BY string_col, bool_col ORDER BY string_col) WHERE c < m" }, - { "sql": "SELECT * FROM (SELECT string_col, bool_col, min(int_col) AS m, count(int_col), count(*) AS c FROM {tbl} GROUP BY string_col, bool_col ORDER BY bool_col, string_col) WHERE c < m" } + { "sql": "SELECT * FROM (SELECT string_col, bool_col, min(int_col) AS m, count(int_col), count(*) AS c FROM {tbl} GROUP BY string_col, bool_col ORDER BY bool_col, string_col) WHERE c < m" }, + { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) HAVING sum(int_col) > 0 ORDER BY upper(string_col)" }, + { "sql": "SELECT upper(string_col), count(int_col) FROM {tbl} GROUP BY upper(string_col) HAVING sum(int_col) >= 0 AND count(int_col) >= 0 ORDER BY count(int_col)" } ] } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org