This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 630846b9740 [Improvement](execution) Use single phase execution commit if only 1 BE is used (#32937) 630846b9740 is described below commit 630846b9740206e2b44af21d34772bccb3760485 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Mar 29 12:02:17 2024 +0800 [Improvement](execution) Use single phase execution commit if only 1 BE is used (#32937) --- fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 9 ++++++--- .../src/main/java/org/apache/doris/qe/SessionVariable.java | 10 ++++++++++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 301dfa78b70..75f898a5f15 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -892,14 +892,17 @@ public class Coordinator implements CoordInterface { int backendIdx = 0; int profileFragmentId = 0; beToPipelineExecCtxs.clear(); - // If #fragments >=2, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start, + // If #fragments > 1 and BE amount is bigger than 1, use twoPhaseExecution with + // exec_plan_fragments_prepare and exec_plan_fragments_start, // else use exec_plan_fragments directly. - // we choose #fragments >=2 because in some cases + // we choose #fragments > 1 because in some cases // we need ensure that A fragment is already prepared to receive data before B fragment sends data. // For example: select * from numbers("number"="10") will generate ExchangeNode and // TableValuedFunctionScanNode, we should ensure TableValuedFunctionScanNode does not // send data until ExchangeNode is ready to receive. - boolean twoPhaseExecution = fragments.size() >= 2; + boolean twoPhaseExecution = ConnectContext.get() != null + && ConnectContext.get().getSessionVariable().isEnableSinglePhaseExecutionCommitOpt() + ? fragments.size() > 1 && addressToBackendID.size() > 1 : fragments.size() > 1; for (PlanFragment fragment : fragments) { FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 67f35690914..92f464ba778 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -208,6 +208,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_BUSHY_TREE = "enable_bushy_tree"; + public static final String ENABLE_SINGLE_PHASE_EXECUTION_COMMIT_OPT = "enable_single_phase_execution_commit_opt"; + public static final String MAX_JOIN_NUMBER_BUSHY_TREE = "max_join_number_bushy_tree"; public static final String ENABLE_PARTITION_TOPN = "enable_partition_topn"; @@ -1186,6 +1188,10 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE, fuzzy = true, varType = VariableAnnotation.DEPRECATED) public boolean enableLocalExchange = true; + @VariableMgr.VarAttr(name = ENABLE_SINGLE_PHASE_EXECUTION_COMMIT_OPT, fuzzy = true, + varType = VariableAnnotation.DEPRECATED) + private boolean enableSinglePhaseExecutionCommitOpt = true; + /** * For debug purpose, don't merge unique key and agg key when reading data. */ @@ -1740,6 +1746,7 @@ public class SessionVariable implements Serializable, Writable { this.parallelPipelineTaskNum = random.nextInt(8); this.enableCommonExprPushdown = random.nextBoolean(); this.enableLocalExchange = random.nextBoolean(); + this.enableSinglePhaseExecutionCommitOpt = random.nextBoolean(); // This will cause be dead loop, disable it first // this.disableJoinReorder = random.nextBoolean(); this.disableStreamPreaggregations = random.nextBoolean(); @@ -3606,4 +3613,7 @@ public class SessionVariable implements Serializable, Writable { return this.maxMsgSizeOfResultReceiver; } + public boolean isEnableSinglePhaseExecutionCommitOpt() { + return enableSinglePhaseExecutionCommitOpt; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org