This is an automated email from the ASF dual-hosted git repository. kangkaisen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 59676a1 [BUG] fix 4149, add sessionVariable to choose broadcastjoin first when cardinality cannot be estimated (#4150) 59676a1 is described below commit 59676a11170da90b2a9e883ca02f1af6dc2d659b Author: wutiangan <wutian...@gmail.com> AuthorDate: Wed Jul 29 12:28:52 2020 +0800 [BUG] fix 4149, add sessionVariable to choose broadcastjoin first when cardinality cannot be estimated (#4150) --- docs/en/administrator-guide/variables.md | 6 ++++++ docs/zh-CN/administrator-guide/variables.md | 5 +++++ .../org/apache/doris/planner/DistributedPlanner.java | 16 +++++++++++++++- .../java/org/apache/doris/qe/SessionVariable.java | 8 ++++++++ .../main/java/org/apache/doris/qe/VariableMgr.java | 6 ++++++ .../java/org/apache/doris/planner/QueryPlanTest.java | 19 +++++++++++++++++++ 6 files changed, 59 insertions(+), 1 deletion(-) diff --git a/docs/en/administrator-guide/variables.md b/docs/en/administrator-guide/variables.md index 5091be4..a41396a 100644 --- a/docs/en/administrator-guide/variables.md +++ b/docs/en/administrator-guide/variables.md @@ -338,3 +338,9 @@ SET forward_to_master = concat('tr', 'u', 'e'); * `rewrite_count_distinct_to_bitmap_hll` Whether to rewrite count distinct queries of bitmap and HLL types as bitmap_union_count and hll_union_agg. + +* `prefer_join_method` + + When choosing the join method(broadcast join or shuffle join), if the broadcast join cost and shuffle join cost are equal, which join method should we prefer. + + Currently, the optional values for this variable are "broadcast" or "shuffle". diff --git a/docs/zh-CN/administrator-guide/variables.md b/docs/zh-CN/administrator-guide/variables.md index 3d21575..f9e6654 100644 --- a/docs/zh-CN/administrator-guide/variables.md +++ b/docs/zh-CN/administrator-guide/variables.md @@ -338,3 +338,8 @@ SET forward_to_master = concat('tr', 'u', 'e'); 是否将 bitmap 和 hll 类型的 count distinct 查询重写为 bitmap_union_count 和 hll_union_agg 。 +* `prefer_join_method` + + 在选择join的具体实现方式是broadcast join还是shuffle join时,如果broadcast join cost和shuffle join cost相等时,优先选择哪种join方式。 + + 目前该变量的可选值为"broadcast" 或者 "shuffle"。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 25be3af..88775e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -281,6 +281,20 @@ public class DistributedPlanner { } /** + * When broadcastCost and partitionCost are equal, there is no uniform standard for which join implementation is better. + * Some scenarios are suitable for broadcast join, and some scenarios are suitable for shuffle join. + * Therefore, we add a SessionVariable to help users choose a better join implementation. + */ + private boolean isBroadcastCostSmaller(long broadcastCost, long partitionCost) { + String joinMethod = ConnectContext.get().getSessionVariable().getPreferJoinMethod(); + if (joinMethod.equalsIgnoreCase("broadcast")) { + return broadcastCost <= partitionCost; + } else { + return broadcastCost < partitionCost; + } + } + + /** * Creates either a broadcast join or a repartitioning join, depending on the expected cost. If any of the inputs to * the cost computation is unknown, it assumes the cost will be 0. Costs being equal, it'll favor partitioned over * broadcast joins. If perNodeMemLimit > 0 and the size of the hash table for a broadcast join is expected to exceed @@ -351,7 +365,7 @@ public class DistributedPlanner { && (perNodeMemLimit == 0 || Math.round( (double) rhsDataSize * PlannerContext.HASH_TBL_SPACE_OVERHEAD) <= perNodeMemLimit) && (node.getInnerRef().isBroadcastJoin() || (!node.getInnerRef().isPartitionJoin() - && broadcastCost < partitionCost))) { + && isBroadcastCostSmaller(broadcastCost, partitionCost)))) { doBroadcast = true; } else { doBroadcast = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 393cdb0..c2a0b88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -74,6 +74,7 @@ public class SessionVariable implements Serializable, Writable { public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num"; public static final String ENABLE_INSERT_STRICT = "enable_insert_strict"; public static final String ENABLE_SPILLING = "enable_spilling"; + public static final String PREFER_JOIN_METHOD = "prefer_join_method"; public static final int MIN_EXEC_INSTANCE_NUM = 1; public static final int MAX_EXEC_INSTANCE_NUM = 32; // if set to true, some of stmt will be forwarded to master FE to get result @@ -209,6 +210,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = DISABLE_COLOCATE_JOIN) private boolean disableColocateJoin = false; + @VariableMgr.VarAttr(name = PREFER_JOIN_METHOD) + private String preferJoinMethod = "broadcast"; + /* * the parallel exec instance num for one Fragment in one BE * 1 means disable this feature @@ -397,6 +401,10 @@ public class SessionVariable implements Serializable, Writable { return disableColocateJoin; } + public String getPreferJoinMethod() {return preferJoinMethod; } + + public void setPreferJoinMethod(String preferJoinMethod) {this.preferJoinMethod = preferJoinMethod; } + public int getParallelExecInstanceNum() { return parallelExecInstanceNum; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java index 3574ae5..b48fbdc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java @@ -244,6 +244,12 @@ public class VariableMgr { } } + if (setVar.getVariable().toLowerCase().equals("prefer_join_method")) { + if (!value.toLowerCase().equals("broadcast") && !value.toLowerCase().equals("shuffle")) { + ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR, "prefer_join_method", value); + } + } + if (setVar.getType() == SetType.GLOBAL) { wlock.lock(); try { diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index 95fa585..433671e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -907,4 +907,23 @@ public class QueryPlanTest { explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); Assert.assertFalse(explainString.contains("INNER JOIN (PARTITIONED)")); } + + @Test + public void testPreferBroadcastJoin() throws Exception { + connectContext.setDatabase("default_cluster:test"); + String queryStr = "explain select * from (select k1 from jointest group by k1)t2, jointest t1 where t1.k1 = t2.k1"; + + // default set PreferBroadcastJoin true + String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertTrue(explainString.contains("INNER JOIN (BROADCAST)")); + + connectContext.getSessionVariable().setPreferJoinMethod("shuffle"); + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertTrue(explainString.contains("INNER JOIN (PARTITIONED)")); + + + connectContext.getSessionVariable().setPreferJoinMethod("broadcast"); + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertTrue(explainString.contains("INNER JOIN (BROADCAST)")); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org