This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new a130cb2b1e [multistage] Modify empty LogicalProject for window functions to have a literal (#10635) a130cb2b1e is described below commit a130cb2b1e465ad9ad3bdb4428a32fef5bf54cdd Author: Sonam Mandal <soman...@linkedin.com> AuthorDate: Mon Apr 24 12:55:25 2023 -0700 [multistage] Modify empty LogicalProject for window functions to have a literal (#10635) * [multistage] Modify empty LogicalProject for window functions to have a literal * Address review comments * Address review comments * Address review comments * Rebase and fix ROW_NUMBER tests * Empty-Commit --- .../apache/calcite/rel/rules/PinotRuleUtils.java | 9 +++ .../rules/PinotWindowExchangeNodeInsertRule.java | 83 ++++++++++++++++++++++ .../pinot/query/QueryEnvironmentTestBase.java | 2 + .../resources/queries/WindowFunctionPlans.json | 37 ++++++++-- .../test/resources/queries/WindowFunctions.json | 46 +++++++++++- 5 files changed, 170 insertions(+), 7 deletions(-) diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java index d2634018ef..6562e642b9 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java @@ -22,6 +22,7 @@ import org.apache.calcite.plan.Contexts; import org.apache.calcite.plan.hep.HepRelVertex; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Exchange; +import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.RelFactories; import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.tools.RelBuilderFactory; @@ -45,4 +46,12 @@ public class PinotRuleUtils { } return reference instanceof Exchange; } + + public static boolean isProject(RelNode rel) { + RelNode reference = rel; + if (reference instanceof HepRelVertex) { + reference = ((HepRelVertex) reference).getCurrentRel(); + } + return reference instanceof Project; + } } diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java index 1532feca6d..ae1ae851b8 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java @@ -21,18 +21,30 @@ package org.apache.calcite.rel.rules; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; +import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.hep.HepRelVertex; import org.apache.calcite.rel.RelDistributions; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.Window; import org.apache.calcite.rel.logical.LogicalExchange; +import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalWindow; import org.apache.calcite.rel.logical.PinotLogicalSortExchange; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.tools.RelBuilderFactory; @@ -81,6 +93,18 @@ public class PinotWindowExchangeNodeInsertRule extends RelOptRule { if (windowGroup.keys.isEmpty() && windowGroup.orderKeys.getKeys().isEmpty()) { // Empty OVER() // Add a single LogicalExchange for empty OVER() since no sort is required + + if (PinotRuleUtils.isProject(windowInput)) { + // Check for empty LogicalProject below LogicalWindow. If present modify it to be a Literal only project and add + // a project above + Project project = (Project) ((HepRelVertex) windowInput).getCurrentRel(); + if (project.getProjects().isEmpty()) { + RelNode returnedRelNode = handleEmptyProjectBelowWindow(window, project); + call.transformTo(returnedRelNode); + return; + } + } + LogicalExchange exchange = LogicalExchange.create(windowInput, RelDistributions.hash(Collections.emptyList())); call.transformTo( LogicalWindow.create(window.getTraitSet(), exchange, window.constants, window.getRowType(), window.groups)); @@ -179,4 +203,63 @@ public class PinotWindowExchangeNodeInsertRule extends RelOptRule { } return isPartitionByOnly; } + + /** + * Only empty OVER() type queries using window functions that take no columns as arguments can result in a situation + * where the LogicalProject below the LogicalWindow is an empty LogicalProject (i.e. no columns are projected). + * The 'ProjectWindowTransposeRule' looks at the columns present in the LogicalProject above the LogicalWindow and + * LogicalWindow to decide what to add to the lower LogicalProject when it does the transpose and for such queries + * if nothing is referenced an empty LogicalProject gets created. Some example queries where this can occur are: + * + * SELECT COUNT(*) OVER() from tableName + * SELECT 42, COUNT(*) OVER() from tableName + * SELECT ROW_NUMBER() OVER() from tableName + * + * This function modifies the empty LogicalProject below the LogicalWindow to add a literal and adds a LogicalProject + * above LogicalWindow to remove the additional literal column from being projected any further. This also handles + * the addition of the LogicalExchange under the LogicalWindow. + * + * TODO: Explore an option to handle empty LogicalProject by actually projecting empty rows for each entry. This way + * there will no longer be a need to add a literal to the empty LogicalProject, but just traverse the number of + * rows + */ + private RelNode handleEmptyProjectBelowWindow(Window window, Project project) { + RelOptCluster cluster = window.getCluster(); + RexBuilder rexBuilder = cluster.getRexBuilder(); + + // Construct the project that goes below the window (which projects a literal) + final List<RexNode> expsForProjectBelowWindow = Collections.singletonList( + rexBuilder.makeLiteral(0, cluster.getTypeFactory().createSqlType(SqlTypeName.INTEGER))); + final List<String> expsFieldNamesBelowWindow = Collections.singletonList("winLiteral"); + Project projectBelowWindow = LogicalProject.create(project.getInput(), project.getHints(), + expsForProjectBelowWindow, expsFieldNamesBelowWindow); + + // Fix up the inputs to the Window to include the literal column and add an exchange + final RelDataTypeFactory.Builder outputBuilder = cluster.getTypeFactory().builder(); + outputBuilder.addAll(projectBelowWindow.getRowType().getFieldList()); + outputBuilder.addAll(window.getRowType().getFieldList()); + + // This scenario is only possible for empty OVER() which uses functions that have no arguments such as COUNT(*) or + // ROW_NUMBER(). Add a LogicalExchange with empty hash distribution list + LogicalExchange exchange = + LogicalExchange.create(projectBelowWindow, RelDistributions.hash(Collections.emptyList())); + Window newWindow = new LogicalWindow(window.getCluster(), window.getTraitSet(), exchange, + window.getConstants(), outputBuilder.build(), window.groups); + + // Create the LogicalProject above window to remove the literal column + final List<RexNode> expsForProjectAboveWindow = new ArrayList<>(); + final List<String> expsFieldNamesAboveWindow = new ArrayList<>(); + final List<RelDataTypeField> rowTypeWindowInput = newWindow.getRowType().getFieldList(); + + int count = 0; + for (int index = 1; index < rowTypeWindowInput.size(); index++) { + // Keep only the non-literal fields. We can start from index = 1 since the first and only column from the lower + // project is the literal column added above. + final RelDataTypeField relDataTypeField = rowTypeWindowInput.get(index); + expsForProjectAboveWindow.add(new RexInputRef(index, relDataTypeField.getType())); + expsFieldNamesAboveWindow.add(String.format("$%d", count)); + } + + return LogicalProject.create(newWindow, project.getHints(), expsForProjectAboveWindow, expsFieldNamesAboveWindow); + } } diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java index 9b72e32613..c5b328e3f2 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java @@ -104,6 +104,8 @@ public class QueryEnvironmentTestBase { + " WHERE a.col3 >= 0 GROUP BY a.col2, a.col3"}, new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 WHERE a.col2 IN ('foo', 'bar') AND" + " b.col2 NOT IN ('alice', 'charlie')"}, + new Object[]{"SELECT COUNT(*) OVER() FROM a"}, + new Object[]{"SELECT 42, COUNT(*) OVER() FROM a"}, new Object[]{"SELECT a.col1, SUM(a.col3) OVER () FROM a"}, new Object[]{"SELECT a.col1, SUM(a.col3) OVER (PARTITION BY a.col2) FROM a"}, new Object[]{"SELECT a.col1, SUM(a.col3) OVER (PARTITION BY a.col2 ORDER BY a.col2) FROM a"}, diff --git a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json index 0be92124b7..84678eaf39 100644 --- a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json +++ b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json @@ -14,16 +14,43 @@ "\n" ] }, + { + "description": "single empty OVER() count(*) only", + "sql": "EXPLAIN PLAN FOR SELECT COUNT(*) OVER() FROM a", + "output": [ + "Execution Plan", + "\nLogicalProject($0=[$1])", + "\n LogicalWindow(window#0=[window(aggs [COUNT()])])", + "\n LogicalExchange(distribution=[hash])", + "\n LogicalProject(winLiteral=[0])", + "\n LogicalTableScan(table=[[a]])", + "\n" + ] + }, + { + "description": "single empty OVER() literal, count(*) only", + "sql": "EXPLAIN PLAN FOR SELECT 42, COUNT(*) OVER() FROM a", + "output": [ + "Execution Plan", + "\nLogicalProject(EXPR$0=[42], EXPR$1=[$0])", + "\n LogicalProject($0=[$1])", + "\n LogicalWindow(window#0=[window(aggs [COUNT()])])", + "\n LogicalExchange(distribution=[hash])", + "\n LogicalProject(winLiteral=[0])", + "\n LogicalTableScan(table=[[a]])", + "\n" + ] + }, { "description": "single empty OVER() only row_number", "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER() FROM a", - "notes": "TODO: ROW_NUMBER() with empty OVER() and no other columns in select results in the leaf level LogicalProject not projecting any rows. This is incorrect since we need to project at least one column for assigning ROW_NUMBERS", "output": [ "Execution Plan", - "\nLogicalWindow(window#0=[window( rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])", - "\n LogicalExchange(distribution=[hash])", - "\n LogicalProject", - "\n LogicalTableScan(table=[[a]])", + "\nLogicalProject($0=[$1])", + "\n LogicalWindow(window#0=[window( rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])", + "\n LogicalExchange(distribution=[hash])", + "\n LogicalProject(winLiteral=[0])", + "\n LogicalTableScan(table=[[a]])", "\n" ] }, diff --git a/pinot-query-runtime/src/test/resources/queries/WindowFunctions.json b/pinot-query-runtime/src/test/resources/queries/WindowFunctions.json index 638079d7f4..5caa982bec 100644 --- a/pinot-query-runtime/src/test/resources/queries/WindowFunctions.json +++ b/pinot-query-runtime/src/test/resources/queries/WindowFunctions.json @@ -51,11 +51,53 @@ [768] ] }, + { + "description": "Single empty OVER() count(*) without select col", + "sql": "SELECT COUNT(*) OVER() FROM {tbl}", + "outputs": [ + [16], + [16], + [16], + [16], + [16], + [16], + [16], + [16], + [16], + [16], + [16], + [16], + [16], + [16], + [16], + [16] + ] + }, + { + "description": "Single empty OVER() count(*) without select col", + "sql": "SELECT 42, COUNT(*) OVER() FROM {tbl}", + "outputs": [ + [42, 16], + [42, 16], + [42, 16], + [42, 16], + [42, 16], + [42, 16], + [42, 16], + [42, 16], + [42, 16], + [42, 16], + [42, 16], + [42, 16], + [42, 16], + [42, 16], + [42, 16], + [42, 16] + ] + }, { "description": "Single empty OVER() row_number", "sql": "SELECT ROW_NUMBER() OVER() FROM {tbl}", - "comments": "ROW_NUMBER() window function without any other columns in select or OVER or filter clauses fails as nothing is projected from the leaf", - "expectedException": "(?s)Received error query execution result block:.*", "outputs": [ [1], [2], --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org