This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 47797ad7e8 [feature](Nereids) Push down not slot references expression 
of on clause (#11805)
47797ad7e8 is described below

commit 47797ad7e8a238bbf1d6a07395eac0df51078056
Author: mch_ucchi <41606806+sohardforan...@users.noreply.github.com>
AuthorDate: Tue Sep 20 13:41:54 2022 +0800

    [feature](Nereids) Push down not slot references expression of on clause 
(#11805)
    
    pushdown not slotreferences expr of on clause.
    select * from t1 join t2 on t1.a + 1 = t2.b + 2 and t1.a + 1 > 2
    
    project()
    +---join(t1.a + 1 = t2.b + 2 && t1.a + 1 > 2)
        |---scan(t1)
        +---scan(t2)
    
    transform to
    
    project()
    +---join(c = d && c > 2)
        |---project(t1.a -> t1.a + 1)
        |   +---scan(t1)
        +---project(t2.b -> t2.b + 2)
            +---scan(t2)
---
 .../org/apache/doris/nereids/rules/RuleSet.java    |   2 +
 .../org/apache/doris/nereids/rules/RuleType.java   |   1 +
 .../PushDownExpressionsInHashCondition.java        | 105 ++++++++++++
 .../nereids/trees/plans/logical/LogicalJoin.java   |  11 ++
 .../PushDownExpressionsInHashConditionTest.java    | 188 +++++++++++++++++++++
 5 files changed, 307 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
index 651992e1e5..1244b4cbc1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
@@ -40,6 +40,7 @@ import 
org.apache.doris.nereids.rules.implementation.LogicalTopNToPhysicalTopN;
 import org.apache.doris.nereids.rules.rewrite.logical.MergeConsecutiveFilters;
 import org.apache.doris.nereids.rules.rewrite.logical.MergeConsecutiveLimits;
 import org.apache.doris.nereids.rules.rewrite.logical.MergeConsecutiveProjects;
+import 
org.apache.doris.nereids.rules.rewrite.logical.PushDownExpressionsInHashCondition;
 import 
org.apache.doris.nereids.rules.rewrite.logical.PushDownJoinOtherCondition;
 import 
org.apache.doris.nereids.rules.rewrite.logical.PushPredicatesThroughJoin;
 import 
org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughProject;
@@ -70,6 +71,7 @@ public class RuleSet {
     public static final List<RuleFactory> PUSH_DOWN_JOIN_CONDITION_RULES = 
ImmutableList.of(
             new PushDownJoinOtherCondition(),
             new PushPredicatesThroughJoin(),
+            new PushDownExpressionsInHashCondition(),
             new PushdownProjectThroughLimit(),
             new PushdownFilterThroughProject(),
             new MergeConsecutiveProjects(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 5204e61283..1765d543f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -82,6 +82,7 @@ public enum RuleType {
     PUSH_DOWN_JOIN_OTHER_CONDITION(RuleTypeClass.REWRITE),
     PUSH_DOWN_PREDICATE_THROUGH_LEFT_SEMI_JOIN(RuleTypeClass.REWRITE),
     PUSH_DOWN_PREDICATE_THROUGH_AGGREGATION(RuleTypeClass.REWRITE),
+    PUSH_DOWN_EXPRESSIONS_IN_HASH_CONDITIONS(RuleTypeClass.REWRITE),
     // column prune rules,
     COLUMN_PRUNE_AGGREGATION_CHILD(RuleTypeClass.REWRITE),
     COLUMN_PRUNE_FILTER_CHILD(RuleTypeClass.REWRITE),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushDownExpressionsInHashCondition.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushDownExpressionsInHashCondition.java
new file mode 100644
index 0000000000..cdb307e3b2
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushDownExpressionsInHashCondition.java
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite.logical;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.util.JoinUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * push down expression which is not slot reference
+ */
+public class PushDownExpressionsInHashCondition extends OneRewriteRuleFactory {
+    /*
+     * rewrite example:
+     *       join(t1.a + 1 = t2.b + 2)                            join(c = d)
+     *             /       \                                         /    \
+     *            /         \                                       /      \
+     *           /           \             ====>                   /        \
+     *          /             \                                   /          \
+     *  olapScan(t1)     olapScan(t2)            project(t1.a + 1 as c)  
project(t2.b + 2 as d)
+     *                                                        |                
   |
+     *                                                        |                
   |
+     *                                                        |                
   |
+     *                                                        |                
   |
+     *                                                     olapScan(t1)        
olapScan(t2)
+     *TODO: now t1.a + t2.a = t1.b is not in hashJoinConjuncts. The rule will 
not handle it.
+     */
+    @Override
+    public Rule build() {
+        return logicalJoin()
+                .when(join -> 
join.getHashJoinConjuncts().stream().anyMatch(equalTo ->
+                        equalTo.children().stream().anyMatch(e -> !(e 
instanceof Slot))))
+                .then(join -> {
+                    List<List<Expression>> exprsOfHashConjuncts =
+                            Lists.newArrayList(Lists.newArrayList(), 
Lists.newArrayList());
+                    Map<Expression, Alias> exprMap = Maps.newHashMap();
+                    join.getHashJoinConjuncts().forEach(conjunct -> {
+                        Preconditions.checkArgument(conjunct instanceof 
EqualTo);
+                        // sometimes: t1 join t2 on t2.a + 1 = t1.a + 2, so 
check the situation, but actually it
+                        // doesn't swap the two sides.
+                        conjunct = JoinUtils.swapEqualToForChildrenOrder(
+                                (EqualTo) conjunct, 
join.left().getOutputSet());
+                        exprsOfHashConjuncts.get(0).add(conjunct.child(0));
+                        exprsOfHashConjuncts.get(1).add(conjunct.child(1));
+                        conjunct.children().forEach(expr ->
+                                exprMap.put(expr, new Alias(expr, "expr_" + 
expr.toSql())));
+                    });
+                    Iterator<List<Expression>> iter = 
exprsOfHashConjuncts.iterator();
+                    return join.withhashJoinConjunctsAndChildren(
+                            join.getHashJoinConjuncts().stream()
+                                    .map(equalTo -> 
equalTo.withChildren(equalTo.children()
+                                            .stream().map(expr -> 
exprMap.get(expr).toSlot())
+                                            .collect(Collectors.toList())))
+                                    .collect(Collectors.toList()),
+                            join.children().stream().map(
+                                    plan -> new LogicalProject<>(new 
ImmutableList.Builder<NamedExpression>()
+                                            
.addAll(iter.next().stream().map(expr -> exprMap.get(expr))
+                                                    
.collect(Collectors.toList()))
+                                            .addAll(getOutput(plan, 
join)).build(), plan))
+                                    .collect(Collectors.toList()));
+                }).toRule(RuleType.PUSH_DOWN_EXPRESSIONS_IN_HASH_CONDITIONS);
+    }
+
+    private List<Slot> getOutput(Plan plan, LogicalJoin join) {
+        Set<Slot> intersectionSlots = Sets.newHashSet(plan.getOutputSet());
+        intersectionSlots.retainAll(join.getOutputSet());
+        return Lists.newArrayList(intersectionSlots);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJoin.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJoin.java
index 29855b4073..87e6d4e03e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJoin.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJoin.java
@@ -250,4 +250,15 @@ public class LogicalJoin<LEFT_CHILD_TYPE extends Plan, 
RIGHT_CHILD_TYPE extends
     public RIGHT_CHILD_TYPE right() {
         return (RIGHT_CHILD_TYPE) child(1);
     }
+
+    public LogicalJoin withHashJoinConjuncts(List<Expression> 
hashJoinConjuncts) {
+        return new LogicalJoin<>(
+                joinType, hashJoinConjuncts, this.otherJoinCondition, left(), 
right(), joinReorderContext);
+    }
+
+    public LogicalJoin withhashJoinConjunctsAndChildren(List<Expression> 
hashJoinConjuncts, List<Plan> children) {
+        Preconditions.checkArgument(children.size() == 2);
+        return new LogicalJoin<>(joinType, hashJoinConjuncts, 
otherJoinCondition, children.get(0), children.get(1),
+                joinReorderContext);
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushDownExpressionsInHashConditionTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushDownExpressionsInHashConditionTest.java
new file mode 100644
index 0000000000..2033b68668
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushDownExpressionsInHashConditionTest.java
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite.logical;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.expressions.NamedExpressionUtil;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.util.PatternMatchSupported;
+import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+public class PushDownExpressionsInHashConditionTest extends TestWithFeService 
implements PatternMatchSupported {
+    private final List<String> testSql = ImmutableList.of(
+            "SELECT * FROM T1 JOIN T2 ON T1.ID + 1 = T2.ID + 2 AND T1.ID + 1 > 
2",
+            "SELECT * FROM (SELECT * FROM T1) X JOIN (SELECT * FROM T2) Y ON 
X.ID + 1 = Y.ID + 2 AND X.ID + 1 > 2",
+            "SELECT * FROM T1 JOIN (SELECT ID, SUM(SCORE) SCORE FROM T2 GROUP 
BY ID) T ON T1.ID + 1 = T.ID AND T.SCORE < 10",
+            "SELECT * FROM T1 JOIN (SELECT ID, SUM(SCORE) SCORE FROM T2 GROUP 
BY ID ORDER BY ID) T ON T1.ID + 1 = T.ID AND T.SCORE < 10"
+    );
+
+    @Override
+    protected void runBeforeAll() throws Exception {
+        createDatabase("test");
+        connectContext.setDatabase("default_cluster:test");
+
+        createTables(
+                "CREATE TABLE IF NOT EXISTS T1 (\n"
+                        + "    id bigint,\n"
+                        + "    score bigint\n"
+                        + ")\n"
+                        + "DUPLICATE KEY(id)\n"
+                        + "DISTRIBUTED BY HASH(id) BUCKETS 1\n"
+                        + "PROPERTIES (\n"
+                        + "  \"replication_num\" = \"1\"\n"
+                        + ")\n",
+                "CREATE TABLE IF NOT EXISTS T2 (\n"
+                        + "    id bigint,\n"
+                        + "    score bigint\n"
+                        + ")\n"
+                        + "DUPLICATE KEY(id)\n"
+                        + "DISTRIBUTED BY HASH(id) BUCKETS 1\n"
+                        + "PROPERTIES (\n"
+                        + "  \"replication_num\" = \"1\"\n"
+                        + ")\n"
+        );
+    }
+
+    @Override
+    protected void runBeforeEach() throws Exception {
+        NamedExpressionUtil.clear();
+    }
+
+    @Test
+    public void testGeneratePhysicalPlan() {
+        List<String> testSql = ImmutableList.of(
+                "SELECT * FROM T1 JOIN T2 ON T1.ID + 1 = T2.ID + 2 AND T1.ID + 
1 > 2",
+                "SELECT * FROM (SELECT * FROM T1) X JOIN (SELECT * FROM T2) Y 
ON X.ID + 1 = Y.ID + 2 AND X.ID + 1 > 2",
+                "SELECT * FROM T1 JOIN (SELECT ID, SUM(SCORE) SCORE FROM T2 
GROUP BY ID) T ON T1.ID + 1 = T.ID AND T.SCORE < 10",
+                "SELECT * FROM T1 JOIN (SELECT ID, SUM(SCORE) SCORE FROM T2 
GROUP BY ID ORDER BY ID) T ON T1.ID + 1 = T.ID AND T.SCORE < 10"
+        );
+        testSql.forEach(sql -> {
+            try {
+                PhysicalPlan plan = new 
NereidsPlanner(createStatementCtx(sql)).plan(
+                        new NereidsParser().parseSingle(sql),
+                        PhysicalProperties.ANY
+                );
+                System.out.println(plan.treeString());
+            } catch (AnalysisException e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    @Test
+    public void testSimpleCase() {
+        PlanChecker.from(connectContext)
+                .analyze("SELECT * FROM T1 JOIN T2 ON T1.ID + 1 = T2.ID + 2 
AND T1.ID + 1 > 2")
+                .applyTopDown(new FindHashConditionForJoin())
+                .applyTopDown(new PushDownExpressionsInHashCondition())
+                .matches(
+                        logicalProject(
+                                logicalJoin(
+                                        logicalProject(
+                                                logicalOlapScan()
+                                        ),
+                                        logicalProject(
+                                                logicalOlapScan()
+                                        )
+                                )
+                        )
+                );
+    }
+
+    @Test
+    public void testSubQueryCase() {
+        PlanChecker.from(connectContext)
+                .analyze(
+                        "SELECT * FROM (SELECT * FROM T1) X JOIN (SELECT * 
FROM T2) Y ON X.ID + 1 = Y.ID + 2 AND X.ID + 1 > 2")
+                .applyTopDown(new FindHashConditionForJoin())
+                .applyTopDown(new PushDownExpressionsInHashCondition())
+                .matches(
+                        logicalProject(
+                                logicalJoin(
+                                        logicalProject(
+                                                logicalProject(
+                                                        logicalOlapScan()
+                                                )
+                                        ),
+                                        logicalProject(
+                                                logicalProject(
+                                                        logicalOlapScan()
+                                                )
+                                        )
+                                )
+                        )
+                );
+    }
+
+    @Test
+    public void testAggNodeCase() {
+        PlanChecker.from(connectContext)
+                .analyze(
+                        "SELECT * FROM T1 JOIN (SELECT ID, SUM(SCORE) SCORE 
FROM T2 GROUP BY ID) T ON T1.ID + 1 = T.ID AND T.SCORE = T1.SCORE + 10")
+                .applyTopDown(new FindHashConditionForJoin())
+                .applyTopDown(new PushDownExpressionsInHashCondition())
+                .matches(
+                        logicalProject(
+                                logicalJoin(
+                                        logicalProject(
+                                                logicalOlapScan()
+                                        ),
+                                        logicalProject(
+                                                logicalAggregate(
+                                                        logicalOlapScan()
+                                                )
+                                        )
+                                )
+                        )
+                );
+    }
+
+    @Test
+    public void testSortNodeCase() {
+        PlanChecker.from(connectContext)
+                .analyze(
+                        "SELECT * FROM T1 JOIN (SELECT ID, SUM(SCORE) SCORE 
FROM T2 GROUP BY ID ORDER BY ID) T ON T1.ID + 1 = T.ID AND T.SCORE = T1.SCORE + 
10")
+                .applyTopDown(new FindHashConditionForJoin())
+                .applyTopDown(new PushDownExpressionsInHashCondition())
+                .matches(
+                        logicalProject(
+                                logicalJoin(
+                                        logicalProject(
+                                                logicalOlapScan()
+                                        ),
+                                        logicalProject(
+                                                logicalSort(
+                                                        logicalAggregate(
+                                                                
logicalOlapScan()
+                                                        )
+                                                )
+                                        )
+                                )
+                        )
+                );
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to