This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit dcfcedaefa573b4e348f5ff7442bb21895065eba Author: starocean999 <40539150+starocean...@users.noreply.github.com> AuthorDate: Tue Dec 20 09:44:47 2022 +0800 [fix](join)the policy to choose colocate join is not correct (#15140) * [hotfix](dev-1.0.1) fix colocate join bug in vec engine after introducing output tuple (#10651) to support vectorized outer join, we introduced a out tuple for hash join node, but it breaks the checking for colocate join. To solve this problem, we need map the output slot id to the children's slot id of hash join node, and the colocate join can be checked correctly. * fix colocate join bug * fix non vec colocate join issue Co-authored-by: lichi <li...@rateup.com.cn> * add test cases Co-authored-by: lichi <li...@rateup.com.cn> --- .../apache/doris/planner/DistributedPlanner.java | 2 +- .../org/apache/doris/planner/HashJoinNode.java | 13 ++++ .../java/org/apache/doris/planner/PlanNode.java | 19 ++++- .../correctness_p0/test_colocate_join.groovy | 84 ++++++++++++++++++++++ 4 files changed, 115 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index e3bd99c064..6382787d1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -489,7 +489,7 @@ public class DistributedPlanner { return null; } ScanNode scanNode = planFragment.getPlanRoot() - .getScanNodeInOneFragmentByTupleId(slotRef.getDesc().getParent().getId()); + .getScanNodeInOneFragmentBySlotRef(slotRef); if (scanNode == null) { cannotReason.add(DistributedPlanColocateRule.REDISTRIBUTED_SRC_DATA); return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index 8eb8e85256..a48778bccd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -841,4 +841,17 @@ public class HashJoinNode extends JoinNodeBase { public void setOtherJoinConjuncts(List<Expr> otherJoinConjuncts) { this.otherJoinConjuncts = otherJoinConjuncts; } + + SlotRef getMappedInputSlotRef(SlotRef slotRef) { + if (outputSmap != null) { + Expr mappedExpr = outputSmap.mappingForRhsExpr(slotRef); + if (mappedExpr != null && mappedExpr instanceof SlotRef) { + return (SlotRef) mappedExpr; + } else { + return null; + } + } else { + return slotRef; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 4ae249f4cf..2c3baf6ec8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -28,6 +28,7 @@ import org.apache.doris.analysis.ExprId; import org.apache.doris.analysis.ExprSubstitutionMap; import org.apache.doris.analysis.FunctionName; import org.apache.doris.analysis.SlotId; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; import org.apache.doris.catalog.Function; @@ -897,12 +898,26 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats { return sb.toString(); } - public ScanNode getScanNodeInOneFragmentByTupleId(TupleId tupleId) { + public ScanNode getScanNodeInOneFragmentBySlotRef(SlotRef slotRef) { + TupleId tupleId = slotRef.getDesc().getParent().getId(); if (this instanceof ScanNode && tupleIds.contains(tupleId)) { return (ScanNode) this; + } else if (this instanceof HashJoinNode) { + HashJoinNode hashJoinNode = (HashJoinNode) this; + SlotRef inputSlotRef = hashJoinNode.getMappedInputSlotRef(slotRef); + if (inputSlotRef != null) { + for (PlanNode planNode : children) { + ScanNode scanNode = planNode.getScanNodeInOneFragmentBySlotRef(inputSlotRef); + if (scanNode != null) { + return scanNode; + } + } + } else { + return null; + } } else if (!(this instanceof ExchangeNode)) { for (PlanNode planNode : children) { - ScanNode scanNode = planNode.getScanNodeInOneFragmentByTupleId(tupleId); + ScanNode scanNode = planNode.getScanNodeInOneFragmentBySlotRef(slotRef); if (scanNode != null) { return scanNode; } diff --git a/regression-test/suites/correctness_p0/test_colocate_join.groovy b/regression-test/suites/correctness_p0/test_colocate_join.groovy new file mode 100644 index 0000000000..6b1e81eb80 --- /dev/null +++ b/regression-test/suites/correctness_p0/test_colocate_join.groovy @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_colocate_join") { + sql """ DROP TABLE IF EXISTS `test_colo1` """ + sql """ DROP TABLE IF EXISTS `test_colo2` """ + sql """ DROP TABLE IF EXISTS `test_colo3` """ + sql """ + CREATE TABLE `test_colo1` ( + `id` varchar(64) NULL, + `name` varchar(64) NULL, + `age` int NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`,`name`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`,`name`) BUCKETS 4 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "colocate_with" = "group", + "in_memory" = "false", + "storage_format" = "V2", + "disable_auto_compaction" = "false" + ); + """ + sql """ + CREATE TABLE `test_colo2` ( + `id` varchar(64) NULL, + `name` varchar(64) NULL, + `age` int NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`,`name`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`,`name`) BUCKETS 4 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "colocate_with" = "group", + "in_memory" = "false", + "storage_format" = "V2", + "disable_auto_compaction" = "false" + ); + """ + + sql """ + CREATE TABLE `test_colo3` ( + `id` varchar(64) NULL, + `name` varchar(64) NULL, + `age` int NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`,`name`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`,`name`) BUCKETS 4 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "colocate_with" = "group", + "in_memory" = "false", + "storage_format" = "V2", + "disable_auto_compaction" = "false" + ); + """ + + sql """insert into test_colo1 values('1','a',12);""" + sql """insert into test_colo2 values('1','a',12);""" + sql """insert into test_colo3 values('1','a',12);""" + + explain { + sql("select a.id,a.name,b.id,b.name,c.id,c.name from test_colo1 a inner join test_colo2 b on a.id = b.id and a.name = b.name inner join test_colo3 c on a.id=c.id and a.name= c.name") + contains "4:VHASH JOIN\n | join op: INNER JOIN(COLOCATE[])[]" + contains "2:VHASH JOIN\n | join op: INNER JOIN(COLOCATE[])[]" + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org