This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 630846b9740 [Improvement](execution) Use single phase execution commit 
if only 1 BE is used (#32937)
630846b9740 is described below

commit 630846b9740206e2b44af21d34772bccb3760485
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Mar 29 12:02:17 2024 +0800

    [Improvement](execution) Use single phase execution commit if only 1 BE is 
used (#32937)
---
 fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java  |  9 ++++++---
 .../src/main/java/org/apache/doris/qe/SessionVariable.java     | 10 ++++++++++
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 301dfa78b70..75f898a5f15 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -892,14 +892,17 @@ public class Coordinator implements CoordInterface {
             int backendIdx = 0;
             int profileFragmentId = 0;
             beToPipelineExecCtxs.clear();
-            // If #fragments >=2, use twoPhaseExecution with 
exec_plan_fragments_prepare and exec_plan_fragments_start,
+            // If #fragments > 1 and BE amount is bigger than 1, use 
twoPhaseExecution with
+            // exec_plan_fragments_prepare and exec_plan_fragments_start,
             // else use exec_plan_fragments directly.
-            // we choose #fragments >=2 because in some cases
+            // we choose #fragments > 1 because in some cases
             // we need ensure that A fragment is already prepared to receive 
data before B fragment sends data.
             // For example: select * from numbers("number"="10") will generate 
ExchangeNode and
             // TableValuedFunctionScanNode, we should ensure 
TableValuedFunctionScanNode does not
             // send data until ExchangeNode is ready to receive.
-            boolean twoPhaseExecution = fragments.size() >= 2;
+            boolean twoPhaseExecution = ConnectContext.get() != null
+                    && 
ConnectContext.get().getSessionVariable().isEnableSinglePhaseExecutionCommitOpt()
+                    ? fragments.size() > 1 && addressToBackendID.size() > 1 : 
fragments.size() > 1;
             for (PlanFragment fragment : fragments) {
                 FragmentExecParams params = 
fragmentExecParamsMap.get(fragment.getFragmentId());
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 67f35690914..92f464ba778 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -208,6 +208,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_BUSHY_TREE = "enable_bushy_tree";
 
+    public static final String ENABLE_SINGLE_PHASE_EXECUTION_COMMIT_OPT = 
"enable_single_phase_execution_commit_opt";
+
     public static final String MAX_JOIN_NUMBER_BUSHY_TREE = 
"max_join_number_bushy_tree";
     public static final String ENABLE_PARTITION_TOPN = "enable_partition_topn";
 
@@ -1186,6 +1188,10 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE, fuzzy = true, varType = 
VariableAnnotation.DEPRECATED)
     public boolean enableLocalExchange = true;
 
+    @VariableMgr.VarAttr(name = ENABLE_SINGLE_PHASE_EXECUTION_COMMIT_OPT, 
fuzzy = true,
+            varType = VariableAnnotation.DEPRECATED)
+    private boolean enableSinglePhaseExecutionCommitOpt = true;
+
     /**
      * For debug purpose, don't merge unique key and agg key when reading data.
      */
@@ -1740,6 +1746,7 @@ public class SessionVariable implements Serializable, 
Writable {
         this.parallelPipelineTaskNum = random.nextInt(8);
         this.enableCommonExprPushdown = random.nextBoolean();
         this.enableLocalExchange = random.nextBoolean();
+        this.enableSinglePhaseExecutionCommitOpt = random.nextBoolean();
         // This will cause be dead loop, disable it first
         // this.disableJoinReorder = random.nextBoolean();
         this.disableStreamPreaggregations = random.nextBoolean();
@@ -3606,4 +3613,7 @@ public class SessionVariable implements Serializable, 
Writable {
         return this.maxMsgSizeOfResultReceiver;
     }
 
+    public boolean isEnableSinglePhaseExecutionCommitOpt() {
+        return enableSinglePhaseExecutionCommitOpt;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to