This is an automated email from the ASF dual-hosted git repository.

kangkaisen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 59676a1  [BUG] fix 4149, add sessionVariable to choose broadcastjoin 
first when cardinality cannot be estimated (#4150)
59676a1 is described below

commit 59676a11170da90b2a9e883ca02f1af6dc2d659b
Author: wutiangan <wutian...@gmail.com>
AuthorDate: Wed Jul 29 12:28:52 2020 +0800

    [BUG] fix 4149, add sessionVariable to choose broadcastjoin first when 
cardinality cannot be estimated (#4150)
---
 docs/en/administrator-guide/variables.md              |  6 ++++++
 docs/zh-CN/administrator-guide/variables.md           |  5 +++++
 .../org/apache/doris/planner/DistributedPlanner.java  | 16 +++++++++++++++-
 .../java/org/apache/doris/qe/SessionVariable.java     |  8 ++++++++
 .../main/java/org/apache/doris/qe/VariableMgr.java    |  6 ++++++
 .../java/org/apache/doris/planner/QueryPlanTest.java  | 19 +++++++++++++++++++
 6 files changed, 59 insertions(+), 1 deletion(-)

diff --git a/docs/en/administrator-guide/variables.md 
b/docs/en/administrator-guide/variables.md
index 5091be4..a41396a 100644
--- a/docs/en/administrator-guide/variables.md
+++ b/docs/en/administrator-guide/variables.md
@@ -338,3 +338,9 @@ SET forward_to_master = concat('tr', 'u', 'e');
 * `rewrite_count_distinct_to_bitmap_hll`
 
     Whether to rewrite count distinct queries of bitmap and HLL types as 
bitmap_union_count and hll_union_agg.
+
+* `prefer_join_method`
+
+    When choosing the join method(broadcast join or shuffle join), if the 
broadcast join cost and shuffle join cost are equal, which join method should 
we prefer.
+
+    Currently, the optional values for this variable are "broadcast" or 
"shuffle".
diff --git a/docs/zh-CN/administrator-guide/variables.md 
b/docs/zh-CN/administrator-guide/variables.md
index 3d21575..f9e6654 100644
--- a/docs/zh-CN/administrator-guide/variables.md
+++ b/docs/zh-CN/administrator-guide/variables.md
@@ -338,3 +338,8 @@ SET forward_to_master = concat('tr', 'u', 'e');
 
     是否将 bitmap 和 hll 类型的 count distinct 查询重写为 bitmap_union_count 和 
hll_union_agg 。
 
+* `prefer_join_method`
+
+    在选择join的具体实现方式是broadcast join还是shuffle join时,如果broadcast join cost和shuffle 
join cost相等时,优先选择哪种join方式。
+
+    目前该变量的可选值为"broadcast" 或者 "shuffle"。
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 25be3af..88775e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -281,6 +281,20 @@ public class DistributedPlanner {
     }
 
     /**
+     * When broadcastCost and partitionCost are equal, there is no uniform 
standard for which join implementation is better.
+     * Some scenarios are suitable for broadcast join, and some scenarios are 
suitable for shuffle join.
+     * Therefore, we add a SessionVariable to help users choose a better join 
implementation.
+     */
+    private boolean isBroadcastCostSmaller(long broadcastCost, long 
partitionCost)  {
+        String joinMethod = 
ConnectContext.get().getSessionVariable().getPreferJoinMethod();
+        if (joinMethod.equalsIgnoreCase("broadcast")) {
+            return broadcastCost <= partitionCost;
+        } else {
+            return broadcastCost < partitionCost;
+        }
+    }
+
+    /**
      * Creates either a broadcast join or a repartitioning join, depending on 
the expected cost. If any of the inputs to
      * the cost computation is unknown, it assumes the cost will be 0. Costs 
being equal, it'll favor partitioned over
      * broadcast joins. If perNodeMemLimit > 0 and the size of the hash table 
for a broadcast join is expected to exceed
@@ -351,7 +365,7 @@ public class DistributedPlanner {
                 && (perNodeMemLimit == 0 || Math.round(
                 (double) rhsDataSize * PlannerContext.HASH_TBL_SPACE_OVERHEAD) 
<= perNodeMemLimit)
                 && (node.getInnerRef().isBroadcastJoin() || 
(!node.getInnerRef().isPartitionJoin()
-                && broadcastCost < partitionCost))) {
+                && isBroadcastCostSmaller(broadcastCost, partitionCost)))) {
             doBroadcast = true;
         } else {
             doBroadcast = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 393cdb0..c2a0b88 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -74,6 +74,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = 
"parallel_fragment_exec_instance_num";
     public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
     public static final String ENABLE_SPILLING = "enable_spilling";
+    public static final String PREFER_JOIN_METHOD = "prefer_join_method";
     public static final int MIN_EXEC_INSTANCE_NUM = 1;
     public static final int MAX_EXEC_INSTANCE_NUM = 32;
     // if set to true, some of stmt will be forwarded to master FE to get 
result
@@ -209,6 +210,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = DISABLE_COLOCATE_JOIN)
     private boolean disableColocateJoin = false;
 
+    @VariableMgr.VarAttr(name = PREFER_JOIN_METHOD)
+    private String preferJoinMethod = "broadcast";
+
     /*
      * the parallel exec instance num for one Fragment in one BE
      * 1 means disable this feature
@@ -397,6 +401,10 @@ public class SessionVariable implements Serializable, 
Writable {
         return disableColocateJoin;
     }
 
+    public String getPreferJoinMethod() {return preferJoinMethod; }
+
+    public void setPreferJoinMethod(String preferJoinMethod) 
{this.preferJoinMethod = preferJoinMethod; }
+
     public int getParallelExecInstanceNum() {
         return parallelExecInstanceNum;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
index 3574ae5..b48fbdc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
@@ -244,6 +244,12 @@ public class VariableMgr {
             }
         }
 
+        if (setVar.getVariable().toLowerCase().equals("prefer_join_method")) {
+            if (!value.toLowerCase().equals("broadcast") && 
!value.toLowerCase().equals("shuffle")) {
+                
ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR, 
"prefer_join_method", value);
+            }
+        }
+
         if (setVar.getType() == SetType.GLOBAL) {
             wlock.lock();
             try {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 95fa585..433671e 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -907,4 +907,23 @@ public class QueryPlanTest {
         explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
         Assert.assertFalse(explainString.contains("INNER JOIN (PARTITIONED)"));
     }
+
+    @Test
+    public void testPreferBroadcastJoin() throws Exception {
+        connectContext.setDatabase("default_cluster:test");
+        String queryStr = "explain select * from (select k1 from jointest 
group by k1)t2, jointest t1 where t1.k1 = t2.k1";
+        
+        // default set PreferBroadcastJoin true
+        String explainString = 
UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertTrue(explainString.contains("INNER JOIN (BROADCAST)"));
+        
+        connectContext.getSessionVariable().setPreferJoinMethod("shuffle");
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
+        Assert.assertTrue(explainString.contains("INNER JOIN (PARTITIONED)"));
+
+    
+        connectContext.getSessionVariable().setPreferJoinMethod("broadcast");
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
+        Assert.assertTrue(explainString.contains("INNER JOIN (BROADCAST)"));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to