This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ac32016b35 Low-risk optProgram rule enhancements (#16035) ac32016b35 is described below commit ac32016b350c9c7571b0c3661709929f8e7714e1 Author: Song Fu <131259315+songw...@users.noreply.github.com> AuthorDate: Wed Jun 11 13:44:42 2025 -0700 Low-risk optProgram rule enhancements (#16035) --- .../PinotJoinPushTransitivePredicatesRule.java | 81 ++++++++++++++ .../calcite/rel/rules/PinotQueryRuleSets.java | 9 +- .../apache/pinot/query/QueryCompilationTest.java | 116 +++++++++++++++++++++ .../resources/queries/ExplainPhysicalPlans.json | 12 +-- .../src/test/resources/queries/JoinPlans.json | 14 +-- 5 files changed, 219 insertions(+), 13 deletions(-) diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinPushTransitivePredicatesRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinPushTransitivePredicatesRule.java new file mode 100644 index 0000000000..24d11d2b53 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinPushTransitivePredicatesRule.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.calcite.rel.rules; + +import org.apache.calcite.plan.RelOptPredicateList; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.rules.JoinPushTransitivePredicatesRule; +import org.apache.calcite.tools.RelBuilder; +import org.apache.pinot.calcite.rel.hint.PinotHintOptions; + + +public class PinotJoinPushTransitivePredicatesRule extends JoinPushTransitivePredicatesRule { + + protected PinotJoinPushTransitivePredicatesRule(Config config) { + super(config); + } + + public static final PinotJoinPushTransitivePredicatesRule INSTANCE + = new PinotJoinPushTransitivePredicatesRule(Config.DEFAULT); + + // Following code are copy-pasted from Calcite, and modified to not push down filter into right side of lookup join. + //@formatter:off + @Override public void onMatch(RelOptRuleCall call) { + Join join = call.rel(0); + final RelMetadataQuery mq = call.getMetadataQuery(); + RelOptPredicateList preds = mq.getPulledUpPredicates(join); + + if (preds.leftInferredPredicates.isEmpty() + && preds.rightInferredPredicates.isEmpty()) { + return; + } + + final RelBuilder relBuilder = call.builder(); + + RelNode left = join.getLeft(); + if (!preds.leftInferredPredicates.isEmpty()) { + RelNode curr = left; + left = relBuilder.push(left) + .filter(preds.leftInferredPredicates).build(); + call.getPlanner().onCopy(curr, left); + } + + // PINOT MODIFICATION to not push down filter into right side of lookup join. + boolean canPushRight = !PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join); + + RelNode right = join.getRight(); + if (canPushRight && !preds.rightInferredPredicates.isEmpty()) { + RelNode curr = right; + right = relBuilder.push(right) + .filter(preds.rightInferredPredicates).build(); + call.getPlanner().onCopy(curr, right); + } + + RelNode newRel = + join.copy(join.getTraitSet(), join.getCondition(), left, right, + join.getJoinType(), join.isSemiJoinDone()); + call.getPlanner().onCopy(join, newRel); + + call.transformTo(newRel); + } + //@formatter:on +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java index 1e80f94e9e..238bcf89f2 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java @@ -75,6 +75,10 @@ public class PinotQueryRuleSets { CoreRules.PROJECT_TO_SEMI_JOIN, PinotSeminJoinDistinctProjectRule.INSTANCE, + // Consider semijoin optimizations first before push transitive predicate + // Pinot version doesn't push predicates to the right in case of lookup join + PinotJoinPushTransitivePredicatesRule.INSTANCE, + // convert non-all union into all-union + distinct CoreRules.UNION_TO_DISTINCT, @@ -97,7 +101,8 @@ public class PinotQueryRuleSets { // Filter pushdown rules run using a RuleCollection since we want to push down a filter as much as possible in a // single HepInstruction. public static final List<RelOptRule> FILTER_PUSHDOWN_RULES = List.of( - CoreRules.FILTER_INTO_JOIN, + // Do not push predicate to the right in case of lookup join + PinotFilterIntoJoinRule.INSTANCE, CoreRules.FILTER_AGGREGATE_TRANSPOSE, CoreRules.FILTER_SET_OP_TRANSPOSE, CoreRules.FILTER_PROJECT_TRANSPOSE @@ -118,6 +123,8 @@ public class PinotQueryRuleSets { CoreRules.FILTER_MERGE, CoreRules.AGGREGATE_REMOVE, CoreRules.SORT_REMOVE, + PruneEmptyRules.CORRELATE_LEFT_INSTANCE, + PruneEmptyRules.CORRELATE_RIGHT_INSTANCE, PruneEmptyRules.AGGREGATE_INSTANCE, PruneEmptyRules.FILTER_INSTANCE, PruneEmptyRules.JOIN_LEFT_INSTANCE, diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java index a955727711..265cbd0748 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java @@ -41,6 +41,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.ProjectNode; import org.apache.pinot.query.routing.QueryServerInstance; import org.testng.annotations.DataProvider; +import org.testng.annotations.Ignore; import org.testng.annotations.Test; import static org.testng.Assert.*; @@ -94,6 +95,121 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase { //@formatter:on } + @Test + public void testAggregateCaseToFilter2() { + // queries like "SELECT SUM(CASE WHEN col1 = 'a' THEN cnt ELSE 0 END) FROM a" are rewritten to + // "SELECT SUM0(cnt) FROM a WHERE col1 = 'a'" + String query = "EXPLAIN PLAN FOR SELECT SUM(CASE WHEN col1 = 'a' THEN 3 ELSE 0 END) FROM a"; + + String explain = _queryEnvironment.explainQuery(query, RANDOM_REQUEST_ID_GEN.nextLong()); + //@formatter:off + assertEquals(explain, + "Execution Plan\n" + + "LogicalProject(EXPR$0=[CASE(=($1, 0), null:BIGINT, $0)])\n" + + " PinotLogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[COUNT($1)], aggType=[FINAL])\n" + + " PinotLogicalExchange(distribution=[hash])\n" + + " PinotLogicalAggregate(group=[{}], agg#0=[$SUM0($0) FILTER $1], agg#1=[COUNT()], aggType=[LEAF])\n" + + " LogicalProject($f1=[3], $f2=[=($0, _UTF-8'a')])\n" + + " PinotLogicalTableScan(table=[[default, a]])\n"); + //@formatter:on + } + + @Test + public void testPruneEmptyCorrelateJoin() { + // queries involving correlated join with dummy + // should be optimized to dummy by PruneEmptyRules.CORRELATE_LEFT_INSTANCE + // or its right equivalence + String query = "EXPLAIN PLAN FOR SELECT *\n" + + "FROM (\n" + + " SELECT * FROM a WHERE 1 = 0\n" + + ") t1\n" + + "WHERE EXISTS (\n" + + " SELECT 1\n" + + " FROM a\n" + + " WHERE a.col1 = t1.col1\n" + + ");\n"; + + String explain = _queryEnvironment.explainQuery(query, RANDOM_REQUEST_ID_GEN.nextLong()); + //@formatter:off + assertEquals(explain, + "Execution Plan\n" + + "LogicalValues(tuples=[[]])\n"); + //@formatter:on + } + + @Test + public void testJoinPushTransitivePredicate() { + // queries involving extra predicate on join keys + // should be optimized to push the predicate to both sides of the join if applicable + String query = "EXPLAIN PLAN FOR\n" + + "SELECT * FROM a\n" + + "JOIN b\n" + + "ON a.col1 = b.col1\n" + + "WHERE a.col1 = 1;\n"; + + String explain = _queryEnvironment.explainQuery(query, RANDOM_REQUEST_ID_GEN.nextLong()); + //@formatter:off + assertEquals(explain, + "Execution Plan\n" + + "LogicalJoin(condition=[=($0, $9)], joinType=[inner])\n" + + " PinotLogicalExchange(distribution=[hash[0]])\n" + + " LogicalFilter(condition=[=(CAST($0):INTEGER NOT NULL, 1)])\n" + + " PinotLogicalTableScan(table=[[default, a]])\n" + + " PinotLogicalExchange(distribution=[hash[0]])\n" + + " LogicalFilter(condition=[=(CAST($0):INTEGER NOT NULL, 1)])\n" + + " PinotLogicalTableScan(table=[[default, b]])\n"); + //@formatter:on + } + + @Test + public void testJoinPushTransitivePredicateLookupJoin() { + // PinotJoinPushTransitivePredicatesRule + // should not push to the right under lookup join hint + String query = "EXPLAIN PLAN FOR\n" + + "SELECT /*+ joinOptions(join_strategy='lookup') */ \n" + + "* FROM a\n" + + "JOIN b\n" + + "ON a.col1 = b.col1\n" + + "WHERE a.col1 = 1;\n"; + + String explain = _queryEnvironment.explainQuery(query, RANDOM_REQUEST_ID_GEN.nextLong()); + //@formatter:off + assertEquals(explain, + "Execution Plan\n" + + "LogicalJoin(condition=[=($0, $9)], joinType=[inner])\n" + + " PinotLogicalExchange(distribution=[single])\n" + + " LogicalFilter(condition=[=(CAST($0):INTEGER NOT NULL, 1)])\n" + + " PinotLogicalTableScan(table=[[default, a]])\n" + + " PinotLogicalTableScan(table=[[default, b]])\n"); + //@formatter:on + } + + @Ignore("This test requires PRUNE_RULES before BASIC_RULES to pass, however enabling that" + + "introduces changes that ~50 hardcoded plans in ResourceBasedQueriesTest would change." + + "It is also needed to investigate why there would be redundant Project and Exchange" + + "when the extra pruning is enabled") + @Test + public void testAggregateJoinRemove() { + // queries where join is left or right join and the aggregate above it has no aggCall + // or all aggCalls are DISTINCT + // should be optimized to remove the join completely + String query = "EXPLAIN PLAN FOR\n" + + "SELECT a.col1, COUNT(DISTINCT a.col3) \n" + + "FROM a \n" + + "LEFT JOIN b ON a.col2 = b.col2\n" + + "GROUP BY a.col1;"; + + String explain = _queryEnvironment.explainQuery(query, RANDOM_REQUEST_ID_GEN.nextLong()); + //@formatter:off + assertEquals(explain, + "Execution Plan\n" + + "PinotLogicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)], aggType=[FINAL])\n" + + " PinotLogicalExchange(distribution=[hash[0]])\n" + + " PinotLogicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($2)], aggType=[LEAF])\n" + + " PinotLogicalTableScan(table=[[default, a]])\n"); + //@formatter:on + } + private static void assertGroupBySingletonAfterJoin(DispatchableSubPlan dispatchableSubPlan, boolean shouldRewrite) { for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageMap().size(); stageId++) { if (dispatchableSubPlan.getTableNames().size() == 0 && !PlannerUtils.isRootPlanFragment(stageId)) { diff --git a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json index db28d08439..8f856b5195 100644 --- a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json +++ b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json @@ -152,7 +152,8 @@ " └── [2]@localhost:1|[1] AGGREGATE_LEAF\n", " └── [2]@localhost:1|[1] JOIN\n", " ├── [2]@localhost:1|[1] PROJECT\n", - " │ └── [2]@localhost:1|[1] TABLE SCAN (a) null\n", + " │ └── [2]@localhost:1|[1] FILTER\n", + " │ └── [2]@localhost:1|[1] TABLE SCAN (a) null\n", " └── [2]@localhost:1|[1] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n", " ├── [3]@localhost:2|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n", " ├── [3]@localhost:2|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n", @@ -160,8 +161,7 @@ " └── [3]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]}\n", " └── [3]@localhost:1|[1] PROJECT\n", " └── [3]@localhost:1|[1] FILTER\n", - " └── [3]@localhost:1|[1] TABLE SCAN (b) null\n", - "" + " └── [3]@localhost:1|[1] TABLE SCAN (b) null\n" ] }, { @@ -180,7 +180,8 @@ " └── [2]@localhost:1|[1] AGGREGATE_LEAF\n", " └── [2]@localhost:1|[1] JOIN\n", " ├── [2]@localhost:1|[1] PROJECT\n", - " │ └── [2]@localhost:1|[1] TABLE SCAN (a) null\n", + " │ └── [2]@localhost:1|[1] FILTER\n", + " │ └── [2]@localhost:1|[1] TABLE SCAN (a) null\n", " └── [2]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", " ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[2]} (Subtree Omitted)\n", " ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[3]} (Subtree Omitted)\n", @@ -188,8 +189,7 @@ " └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[1]}\n", " └── [3]@localhost:1|[1] PROJECT\n", " └── [3]@localhost:1|[1] FILTER\n", - " └── [3]@localhost:1|[1] TABLE SCAN (b) null\n", - "" + " └── [3]@localhost:1|[1] TABLE SCAN (b) null\n" ] }, { diff --git a/pinot-query-planner/src/test/resources/queries/JoinPlans.json b/pinot-query-planner/src/test/resources/queries/JoinPlans.json index 9ce84bc53b..dea372f5fd 100644 --- a/pinot-query-planner/src/test/resources/queries/JoinPlans.json +++ b/pinot-query-planner/src/test/resources/queries/JoinPlans.json @@ -520,7 +520,8 @@ "\nLogicalJoin(condition=[=($2, $9)], joinType=[semi])", "\n LogicalJoin(condition=[=($1, $9)], joinType=[semi])", "\n LogicalJoin(condition=[=($1, $9)], joinType=[semi])", - "\n PinotLogicalTableScan(table=[[default, a]])", + "\n LogicalFilter(condition=[<($2, 100)])", + "\n PinotLogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", "\n LogicalProject(col1=[$0])", "\n LogicalFilter(condition=[SEARCH($1, Sarg[(-∞.._UTF-8'bar':VARCHAR CHARACTER SET \"UTF-8\"), (_UTF-8'bar':VARCHAR CHARACTER SET \"UTF-8\".._UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"), (_UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"..+∞)]:VARCHAR CHARACTER SET \"UTF-8\")])", @@ -548,7 +549,8 @@ "\n LogicalProject(col3=[$1])", "\n LogicalJoin(condition=[=($0, $2)], joinType=[semi])", "\n LogicalProject(col2=[$1], col3=[$2])", - "\n PinotLogicalTableScan(table=[[default, a]])", + "\n LogicalFilter(condition=[<($2, 100)])", + "\n PinotLogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", "\n LogicalProject(col1=[$0])", "\n LogicalFilter(condition=[SEARCH($1, Sarg[(-∞.._UTF-8'bar':VARCHAR CHARACTER SET \"UTF-8\"), (_UTF-8'bar':VARCHAR CHARACTER SET \"UTF-8\".._UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"), (_UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"..+∞)]:VARCHAR CHARACTER SET \"UTF-8\")])", @@ -569,12 +571,12 @@ "\n LogicalFilter(condition=[>($2, 10)])", "\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[COUNT($2)], aggType=[FINAL])", "\n PinotLogicalExchange(distribution=[hash[0]])", - "\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[COUNT()], aggType=[LEAF])", - "\n LogicalJoin(condition=[=($1, $2)], joinType=[semi])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[COUNT()], aggType=[LEAF])\n LogicalJoin(condition=[=($1, $2)], joinType=[semi])", "\n LogicalProject(col1=[$0], col3=[$2])", "\n LogicalJoin(condition=[=($1, $3)], joinType=[semi])", "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])", - "\n PinotLogicalTableScan(table=[[default, a]])", + "\n LogicalFilter(condition=[<($2, 100)])", + "\n PinotLogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", "\n LogicalProject(col1=[$0])", "\n LogicalFilter(condition=[SEARCH($1, Sarg[(-∞.._UTF-8'bar':VARCHAR CHARACTER SET \"UTF-8\"), (_UTF-8'bar':VARCHAR CHARACTER SET \"UTF-8\".._UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"), (_UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"..+∞)]:VARCHAR CHARACTER SET \"UTF-8\")])", @@ -601,7 +603,7 @@ "\n PinotLogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", "\n LogicalProject(col3=[$2])", - "\n LogicalFilter(condition=[SEARCH($1, Sarg[(-∞.._UTF-8'bar':VARCHAR CHARACTER SET \"UTF-8\"), (_UTF-8'bar':VARCHAR CHARACTER SET \"UTF-8\".._UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"), (_UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"..+∞)]:VARCHAR CHARACTER SET \"UTF-8\")])", + "\n LogicalFilter(condition=[AND(SEARCH($1, Sarg[(-∞.._UTF-8'bar':VARCHAR CHARACTER SET \"UTF-8\"), (_UTF-8'bar':VARCHAR CHARACTER SET \"UTF-8\".._UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"), (_UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"..+∞)]:VARCHAR CHARACTER SET \"UTF-8\"), <($2, 100))])", "\n PinotLogicalTableScan(table=[[default, a]])", "\n" ] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org