This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 47797ad7e8 [feature](Nereids) Push down not slot references expression of on clause (#11805) 47797ad7e8 is described below commit 47797ad7e8a238bbf1d6a07395eac0df51078056 Author: mch_ucchi <41606806+sohardforan...@users.noreply.github.com> AuthorDate: Tue Sep 20 13:41:54 2022 +0800 [feature](Nereids) Push down not slot references expression of on clause (#11805) pushdown not slotreferences expr of on clause. select * from t1 join t2 on t1.a + 1 = t2.b + 2 and t1.a + 1 > 2 project() +---join(t1.a + 1 = t2.b + 2 && t1.a + 1 > 2) |---scan(t1) +---scan(t2) transform to project() +---join(c = d && c > 2) |---project(t1.a -> t1.a + 1) | +---scan(t1) +---project(t2.b -> t2.b + 2) +---scan(t2) --- .../org/apache/doris/nereids/rules/RuleSet.java | 2 + .../org/apache/doris/nereids/rules/RuleType.java | 1 + .../PushDownExpressionsInHashCondition.java | 105 ++++++++++++ .../nereids/trees/plans/logical/LogicalJoin.java | 11 ++ .../PushDownExpressionsInHashConditionTest.java | 188 +++++++++++++++++++++ 5 files changed, 307 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java index 651992e1e5..1244b4cbc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java @@ -40,6 +40,7 @@ import org.apache.doris.nereids.rules.implementation.LogicalTopNToPhysicalTopN; import org.apache.doris.nereids.rules.rewrite.logical.MergeConsecutiveFilters; import org.apache.doris.nereids.rules.rewrite.logical.MergeConsecutiveLimits; import org.apache.doris.nereids.rules.rewrite.logical.MergeConsecutiveProjects; +import org.apache.doris.nereids.rules.rewrite.logical.PushDownExpressionsInHashCondition; import org.apache.doris.nereids.rules.rewrite.logical.PushDownJoinOtherCondition; import org.apache.doris.nereids.rules.rewrite.logical.PushPredicatesThroughJoin; import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughProject; @@ -70,6 +71,7 @@ public class RuleSet { public static final List<RuleFactory> PUSH_DOWN_JOIN_CONDITION_RULES = ImmutableList.of( new PushDownJoinOtherCondition(), new PushPredicatesThroughJoin(), + new PushDownExpressionsInHashCondition(), new PushdownProjectThroughLimit(), new PushdownFilterThroughProject(), new MergeConsecutiveProjects(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 5204e61283..1765d543f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -82,6 +82,7 @@ public enum RuleType { PUSH_DOWN_JOIN_OTHER_CONDITION(RuleTypeClass.REWRITE), PUSH_DOWN_PREDICATE_THROUGH_LEFT_SEMI_JOIN(RuleTypeClass.REWRITE), PUSH_DOWN_PREDICATE_THROUGH_AGGREGATION(RuleTypeClass.REWRITE), + PUSH_DOWN_EXPRESSIONS_IN_HASH_CONDITIONS(RuleTypeClass.REWRITE), // column prune rules, COLUMN_PRUNE_AGGREGATION_CHILD(RuleTypeClass.REWRITE), COLUMN_PRUNE_FILTER_CHILD(RuleTypeClass.REWRITE), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushDownExpressionsInHashCondition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushDownExpressionsInHashCondition.java new file mode 100644 index 0000000000..cdb307e3b2 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushDownExpressionsInHashCondition.java @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite.logical; + +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.EqualTo; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.util.JoinUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * push down expression which is not slot reference + */ +public class PushDownExpressionsInHashCondition extends OneRewriteRuleFactory { + /* + * rewrite example: + * join(t1.a + 1 = t2.b + 2) join(c = d) + * / \ / \ + * / \ / \ + * / \ ====> / \ + * / \ / \ + * olapScan(t1) olapScan(t2) project(t1.a + 1 as c) project(t2.b + 2 as d) + * | | + * | | + * | | + * | | + * olapScan(t1) olapScan(t2) + *TODO: now t1.a + t2.a = t1.b is not in hashJoinConjuncts. The rule will not handle it. + */ + @Override + public Rule build() { + return logicalJoin() + .when(join -> join.getHashJoinConjuncts().stream().anyMatch(equalTo -> + equalTo.children().stream().anyMatch(e -> !(e instanceof Slot)))) + .then(join -> { + List<List<Expression>> exprsOfHashConjuncts = + Lists.newArrayList(Lists.newArrayList(), Lists.newArrayList()); + Map<Expression, Alias> exprMap = Maps.newHashMap(); + join.getHashJoinConjuncts().forEach(conjunct -> { + Preconditions.checkArgument(conjunct instanceof EqualTo); + // sometimes: t1 join t2 on t2.a + 1 = t1.a + 2, so check the situation, but actually it + // doesn't swap the two sides. + conjunct = JoinUtils.swapEqualToForChildrenOrder( + (EqualTo) conjunct, join.left().getOutputSet()); + exprsOfHashConjuncts.get(0).add(conjunct.child(0)); + exprsOfHashConjuncts.get(1).add(conjunct.child(1)); + conjunct.children().forEach(expr -> + exprMap.put(expr, new Alias(expr, "expr_" + expr.toSql()))); + }); + Iterator<List<Expression>> iter = exprsOfHashConjuncts.iterator(); + return join.withhashJoinConjunctsAndChildren( + join.getHashJoinConjuncts().stream() + .map(equalTo -> equalTo.withChildren(equalTo.children() + .stream().map(expr -> exprMap.get(expr).toSlot()) + .collect(Collectors.toList()))) + .collect(Collectors.toList()), + join.children().stream().map( + plan -> new LogicalProject<>(new ImmutableList.Builder<NamedExpression>() + .addAll(iter.next().stream().map(expr -> exprMap.get(expr)) + .collect(Collectors.toList())) + .addAll(getOutput(plan, join)).build(), plan)) + .collect(Collectors.toList())); + }).toRule(RuleType.PUSH_DOWN_EXPRESSIONS_IN_HASH_CONDITIONS); + } + + private List<Slot> getOutput(Plan plan, LogicalJoin join) { + Set<Slot> intersectionSlots = Sets.newHashSet(plan.getOutputSet()); + intersectionSlots.retainAll(join.getOutputSet()); + return Lists.newArrayList(intersectionSlots); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJoin.java index 29855b4073..87e6d4e03e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJoin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJoin.java @@ -250,4 +250,15 @@ public class LogicalJoin<LEFT_CHILD_TYPE extends Plan, RIGHT_CHILD_TYPE extends public RIGHT_CHILD_TYPE right() { return (RIGHT_CHILD_TYPE) child(1); } + + public LogicalJoin withHashJoinConjuncts(List<Expression> hashJoinConjuncts) { + return new LogicalJoin<>( + joinType, hashJoinConjuncts, this.otherJoinCondition, left(), right(), joinReorderContext); + } + + public LogicalJoin withhashJoinConjunctsAndChildren(List<Expression> hashJoinConjuncts, List<Plan> children) { + Preconditions.checkArgument(children.size() == 2); + return new LogicalJoin<>(joinType, hashJoinConjuncts, otherJoinCondition, children.get(0), children.get(1), + joinReorderContext); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushDownExpressionsInHashConditionTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushDownExpressionsInHashConditionTest.java new file mode 100644 index 0000000000..2033b68668 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushDownExpressionsInHashConditionTest.java @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite.logical; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.NamedExpressionUtil; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; +import org.apache.doris.nereids.util.PatternMatchSupported; +import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Test; + +import java.util.List; + +public class PushDownExpressionsInHashConditionTest extends TestWithFeService implements PatternMatchSupported { + private final List<String> testSql = ImmutableList.of( + "SELECT * FROM T1 JOIN T2 ON T1.ID + 1 = T2.ID + 2 AND T1.ID + 1 > 2", + "SELECT * FROM (SELECT * FROM T1) X JOIN (SELECT * FROM T2) Y ON X.ID + 1 = Y.ID + 2 AND X.ID + 1 > 2", + "SELECT * FROM T1 JOIN (SELECT ID, SUM(SCORE) SCORE FROM T2 GROUP BY ID) T ON T1.ID + 1 = T.ID AND T.SCORE < 10", + "SELECT * FROM T1 JOIN (SELECT ID, SUM(SCORE) SCORE FROM T2 GROUP BY ID ORDER BY ID) T ON T1.ID + 1 = T.ID AND T.SCORE < 10" + ); + + @Override + protected void runBeforeAll() throws Exception { + createDatabase("test"); + connectContext.setDatabase("default_cluster:test"); + + createTables( + "CREATE TABLE IF NOT EXISTS T1 (\n" + + " id bigint,\n" + + " score bigint\n" + + ")\n" + + "DUPLICATE KEY(id)\n" + + "DISTRIBUTED BY HASH(id) BUCKETS 1\n" + + "PROPERTIES (\n" + + " \"replication_num\" = \"1\"\n" + + ")\n", + "CREATE TABLE IF NOT EXISTS T2 (\n" + + " id bigint,\n" + + " score bigint\n" + + ")\n" + + "DUPLICATE KEY(id)\n" + + "DISTRIBUTED BY HASH(id) BUCKETS 1\n" + + "PROPERTIES (\n" + + " \"replication_num\" = \"1\"\n" + + ")\n" + ); + } + + @Override + protected void runBeforeEach() throws Exception { + NamedExpressionUtil.clear(); + } + + @Test + public void testGeneratePhysicalPlan() { + List<String> testSql = ImmutableList.of( + "SELECT * FROM T1 JOIN T2 ON T1.ID + 1 = T2.ID + 2 AND T1.ID + 1 > 2", + "SELECT * FROM (SELECT * FROM T1) X JOIN (SELECT * FROM T2) Y ON X.ID + 1 = Y.ID + 2 AND X.ID + 1 > 2", + "SELECT * FROM T1 JOIN (SELECT ID, SUM(SCORE) SCORE FROM T2 GROUP BY ID) T ON T1.ID + 1 = T.ID AND T.SCORE < 10", + "SELECT * FROM T1 JOIN (SELECT ID, SUM(SCORE) SCORE FROM T2 GROUP BY ID ORDER BY ID) T ON T1.ID + 1 = T.ID AND T.SCORE < 10" + ); + testSql.forEach(sql -> { + try { + PhysicalPlan plan = new NereidsPlanner(createStatementCtx(sql)).plan( + new NereidsParser().parseSingle(sql), + PhysicalProperties.ANY + ); + System.out.println(plan.treeString()); + } catch (AnalysisException e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testSimpleCase() { + PlanChecker.from(connectContext) + .analyze("SELECT * FROM T1 JOIN T2 ON T1.ID + 1 = T2.ID + 2 AND T1.ID + 1 > 2") + .applyTopDown(new FindHashConditionForJoin()) + .applyTopDown(new PushDownExpressionsInHashCondition()) + .matches( + logicalProject( + logicalJoin( + logicalProject( + logicalOlapScan() + ), + logicalProject( + logicalOlapScan() + ) + ) + ) + ); + } + + @Test + public void testSubQueryCase() { + PlanChecker.from(connectContext) + .analyze( + "SELECT * FROM (SELECT * FROM T1) X JOIN (SELECT * FROM T2) Y ON X.ID + 1 = Y.ID + 2 AND X.ID + 1 > 2") + .applyTopDown(new FindHashConditionForJoin()) + .applyTopDown(new PushDownExpressionsInHashCondition()) + .matches( + logicalProject( + logicalJoin( + logicalProject( + logicalProject( + logicalOlapScan() + ) + ), + logicalProject( + logicalProject( + logicalOlapScan() + ) + ) + ) + ) + ); + } + + @Test + public void testAggNodeCase() { + PlanChecker.from(connectContext) + .analyze( + "SELECT * FROM T1 JOIN (SELECT ID, SUM(SCORE) SCORE FROM T2 GROUP BY ID) T ON T1.ID + 1 = T.ID AND T.SCORE = T1.SCORE + 10") + .applyTopDown(new FindHashConditionForJoin()) + .applyTopDown(new PushDownExpressionsInHashCondition()) + .matches( + logicalProject( + logicalJoin( + logicalProject( + logicalOlapScan() + ), + logicalProject( + logicalAggregate( + logicalOlapScan() + ) + ) + ) + ) + ); + } + + @Test + public void testSortNodeCase() { + PlanChecker.from(connectContext) + .analyze( + "SELECT * FROM T1 JOIN (SELECT ID, SUM(SCORE) SCORE FROM T2 GROUP BY ID ORDER BY ID) T ON T1.ID + 1 = T.ID AND T.SCORE = T1.SCORE + 10") + .applyTopDown(new FindHashConditionForJoin()) + .applyTopDown(new PushDownExpressionsInHashCondition()) + .matches( + logicalProject( + logicalJoin( + logicalProject( + logicalOlapScan() + ), + logicalProject( + logicalSort( + logicalAggregate( + logicalOlapScan() + ) + ) + ) + ) + ) + ); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org