This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 281478e819 Enabling LogicalProject pushdown optimizations to eliminate exchange of unused columns (#14198) 281478e819 is described below commit 281478e81951ce804ba890042002080dfbd201de Author: Shaurya Chaturvedi <shauryach...@users.noreply.github.com> AuthorDate: Tue Oct 22 16:56:28 2024 -0700 Enabling LogicalProject pushdown optimizations to eliminate exchange of unused columns (#14198) --- .../calcite/rel/rules/PinotQueryRuleSets.java | 8 ++ .../org/apache/pinot/query/QueryEnvironment.java | 6 ++ .../src/test/resources/queries/JoinPlans.json | 98 +++++++++++++++++----- .../resources/queries/WindowFunctionPlans.json | 4 +- 4 files changed, 94 insertions(+), 22 deletions(-) diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java index 45f867c11b..5bc55835e7 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java @@ -101,6 +101,14 @@ public class PinotQueryRuleSets { CoreRules.FILTER_PROJECT_TRANSPOSE ); + // Project pushdown rules run using a RuleCollection since we want to push down a project as much as possible in a + // single HepInstruction. + public static final List<RelOptRule> PROJECT_PUSHDOWN_RULES = List.of( + CoreRules.PROJECT_FILTER_TRANSPOSE, + CoreRules.PROJECT_JOIN_TRANSPOSE, + CoreRules.PROJECT_MERGE + ); + // The pruner rules run top-down to ensure Calcite restarts from root node after applying a transformation. public static final List<RelOptRule> PRUNE_RULES = List.of( CoreRules.AGGREGATE_PROJECT_MERGE, diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 1681d41c94..629c7ae2c5 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -427,6 +427,12 @@ public class QueryEnvironment { // Pushdown filters using a single HepInstruction. hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES); + // Pushdown projects after first filter pushdown to minimize projected columns. + hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.PROJECT_PUSHDOWN_RULES); + + // Pushdown filters again since filter should be pushed down at the lowest level, after project pushdown. + hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES); + // ---- // Prune duplicate/unnecessary nodes using a single HepInstruction. // TODO: We can consider using HepMatchOrder.TOP_DOWN if we find cases where it would help. diff --git a/pinot-query-planner/src/test/resources/queries/JoinPlans.json b/pinot-query-planner/src/test/resources/queries/JoinPlans.json index 168c3ceaa3..45fcf62251 100644 --- a/pinot-query-planner/src/test/resources/queries/JoinPlans.json +++ b/pinot-query-planner/src/test/resources/queries/JoinPlans.json @@ -272,14 +272,14 @@ "output": [ "Execution Plan", "\nLogicalProject(col1=[$0], col2=[$1])", - "\n LogicalFilter(condition=[IS NOT TRUE($8)])", - "\n LogicalJoin(condition=[=($6, $7)], joinType=[left])", - "\n PinotLogicalExchange(distribution=[hash[6]])", - "\n LogicalProject(col1=[$0], col2=[$1], col30=[$3], $f1=[$4], col32=[$5], $f10=[$7], col34=[$2])", - "\n LogicalFilter(condition=[IS NOT TRUE($7)])", - "\n LogicalJoin(condition=[=($5, $6)], joinType=[left])", - "\n PinotLogicalExchange(distribution=[hash[5]])", - "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2], col30=[$3], $f1=[$5], col32=[$2])", + "\n LogicalFilter(condition=[IS NOT TRUE($4)])", + "\n LogicalJoin(condition=[=($2, $3)], joinType=[left])", + "\n PinotLogicalExchange(distribution=[hash[2]])", + "\n LogicalProject(col1=[$0], col2=[$1], col34=[$2])", + "\n LogicalFilter(condition=[IS NOT TRUE($5)])", + "\n LogicalJoin(condition=[=($3, $4)], joinType=[left])", + "\n PinotLogicalExchange(distribution=[hash[3]])", + "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2], col32=[$2])", "\n LogicalFilter(condition=[IS NOT TRUE($5)])", "\n LogicalJoin(condition=[=($3, $4)], joinType=[left])", "\n PinotLogicalExchange(distribution=[hash[3]])", @@ -294,19 +294,21 @@ "\n LogicalFilter(condition=[=($0, _UTF-8'foo')])", "\n LogicalTableScan(table=[[default, b]])", "\n PinotLogicalExchange(distribution=[hash[0]])", - "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", - "\n PinotLogicalExchange(distribution=[hash[0]])", - "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", - "\n LogicalProject(col3=[$2], $f1=[true])", - "\n LogicalFilter(condition=[=($0, _UTF-8'bar')])", - "\n LogicalTableScan(table=[[default, b]])", + "\n LogicalProject(col3=[$0], $f1=[$1])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n LogicalProject(col3=[$2], $f1=[true])", + "\n LogicalFilter(condition=[=($0, _UTF-8'bar')])", + "\n LogicalTableScan(table=[[default, b]])", "\n PinotLogicalExchange(distribution=[hash[0]])", - "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", - "\n PinotLogicalExchange(distribution=[hash[0]])", - "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", - "\n LogicalProject(col3=[$2], $f1=[true])", - "\n LogicalFilter(condition=[=($0, _UTF-8'foobar')])", - "\n LogicalTableScan(table=[[default, b]])", + "\n LogicalProject(col3=[$0], $f1=[$1])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n LogicalProject(col3=[$2], $f1=[true])", + "\n LogicalFilter(condition=[=($0, _UTF-8'foobar')])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -517,6 +519,62 @@ "\n LogicalTableScan(table=[[default, a]])", "\n" ] + }, + { + "description": "Multiple IN and NOT IN joins while selecting count at top", + "sql": "EXPLAIN PLAN FOR SELECT count(*) FROM a WHERE a.col1 = 'foo' AND col2 = 'xylo' AND a.col4 = 12 AND a.col5 = false AND col3 NOT IN (SELECT col3 FROM b WHERE col1='foo') AND col3 NOT IN (SELECT col3 FROM b WHERE col1='bar') AND col3 NOT IN (SELECT col3 FROM b WHERE col1='foobar') AND col3 IN (SELECT col3 FROM b WHERE col1 = 'fork')", + "output": [ + "Execution Plan", + "\nPinotLogicalAggregate(group=[{}], agg#0=[COUNT($0)])", + "\n PinotLogicalExchange(distribution=[hash])", + "\n PinotLogicalAggregate(group=[{}], agg#0=[COUNT()])", + "\n LogicalJoin(condition=[=($0, $1)], joinType=[semi])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n LogicalProject(col3=[$0])", + "\n LogicalFilter(condition=[IS NOT TRUE($3)])", + "\n LogicalJoin(condition=[=($1, $2)], joinType=[left])", + "\n PinotLogicalExchange(distribution=[hash[1]])", + "\n LogicalProject(col3=[$0], col34=[$0])", + "\n LogicalFilter(condition=[IS NOT TRUE($3)])", + "\n LogicalJoin(condition=[=($1, $2)], joinType=[left])", + "\n PinotLogicalExchange(distribution=[hash[1]])", + "\n LogicalProject(col3=[$0], col32=[$0])", + "\n LogicalFilter(condition=[IS NOT TRUE($3)])", + "\n LogicalJoin(condition=[=($1, $2)], joinType=[left])", + "\n PinotLogicalExchange(distribution=[hash[1]])", + "\n LogicalProject(col3=[$2], col30=[$2])", + "\n LogicalFilter(condition=[AND(=($0, _UTF-8'foo'), =($1, _UTF-8'xylo'), =($3, 12), NOT($4))])", + "\n LogicalTableScan(table=[[default, a]])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n LogicalProject(col3=[$0], $f1=[$1])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n LogicalProject(col3=[$2], $f1=[true])", + "\n LogicalFilter(condition=[=($0, _UTF-8'foo')])", + "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n LogicalProject(col3=[$0], $f1=[$1])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n LogicalProject(col3=[$2], $f1=[true])", + "\n LogicalFilter(condition=[=($0, _UTF-8'bar')])", + "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n LogicalProject(col3=[$0], $f1=[$1])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n LogicalProject(col3=[$2], $f1=[true])", + "\n LogicalFilter(condition=[=($0, _UTF-8'foobar')])", + "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n LogicalProject(col3=[$2])", + "\n LogicalFilter(condition=[=($0, _UTF-8'fork')])", + "\n LogicalTableScan(table=[[default, b]])", + "\n" + ] } ] }, diff --git a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json index ac8ef92784..191dea2fdf 100644 --- a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json +++ b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json @@ -3404,7 +3404,7 @@ "sql": "EXPLAIN PLAN FOR WITH windowfunc AS (SELECT a.col1, ROW_NUMBER() OVER(PARTITION BY a.col2 ORDER BY a.col3) as rownum from a) SELECT a.col1, a.rownum FROM windowfunc AS a where a.rownum < 5", "output": [ "Execution Plan", - "\nLogicalProject(col1=[$0], $1=[$3])", + "\nLogicalProject(col1=[$0], w0$o0=[$3])", "\n LogicalFilter(condition=[<($3, 5)])", "\n LogicalWindow(window#0=[window(partition {1} order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])", "\n PinotLogicalSortExchange(distribution=[hash[1]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])", @@ -3418,7 +3418,7 @@ "sql": "EXPLAIN PLAN FOR WITH windowfunc AS (SELECT a.col1, RANK() OVER(PARTITION BY a.col2 ORDER BY a.col3) as rank, DENSE_RANK() OVER(PARTITION BY a.col2 ORDER BY a.col3) as dense_rank from a) SELECT a.col1, a.rank, a.dense_rank FROM windowfunc AS a where a.dense_rank < 5", "output": [ "Execution Plan", - "\nLogicalProject(col1=[$0], $1=[$3], $2=[$4])", + "\nLogicalProject(col1=[$0], w0$o0=[$3], w0$o1=[$4])", "\n LogicalFilter(condition=[<($4, 5)])", "\n LogicalWindow(window#0=[window(partition {1} order by [2] aggs [RANK(), DENSE_RANK()])])", "\n PinotLogicalSortExchange(distribution=[hash[1]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org