This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 839b469879 [fix](meta) set parallel_pipeline_task_num when upgrading 
from 1.2 to 2.0 (#22618)
839b469879 is described below

commit 839b4698791e94a998d2269feaa3736c4bb8819e
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Sat Aug 5 11:04:39 2023 +0800

    [fix](meta) set parallel_pipeline_task_num when upgrading from 1.2 to 2.0 
(#22618)
---
 .../src/main/java/org/apache/doris/catalog/Env.java  | 12 ++++++++++++
 .../main/java/org/apache/doris/qe/VariableMgr.java   | 20 ++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 50c094beb2..9a286ad98c 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -1365,6 +1365,18 @@ public class Env {
             editLog.logAddFirstFrontend(self);
 
             initLowerCaseTableNames();
+        } else {
+            if (journalVersion <= FeMetaVersion.VERSION_114) {
+                // if journal version is less than 114, which means it is 
upgraded from version before 2.0.
+                // When upgrading from 1.2 to 2.0, we need to make sure that 
the parallelism of query remain unchanged
+                // when switch to pipeline engine, otherwise it may impact the 
load of entire cluster
+                // because the default parallelism of pipeline engine is 
higher than previous version.
+                // so set parallel_pipeline_task_num to 
parallel_fragment_exec_instance_num
+                int newVal = 
VariableMgr.newSessionVariable().parallelExecInstanceNum;
+                VariableMgr.setGlobalPipelineTask(newVal);
+                LOG.info("upgrade FE from 1.x to 2.0, set 
parallel_pipeline_task_num "
+                        + "to parallel_fragment_exec_instance_num: {}", 
newVal);
+            }
         }
 
         getPolicyMgr().createDefaultStoragePolicy();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
index 24b7468e77..853e0679e2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
@@ -367,6 +367,26 @@ public class VariableMgr {
         }
     }
 
+    public static void setGlobalPipelineTask(int instance) {
+        wlock.lock();
+        try {
+            VarContext ctx = 
ctxByVarName.get(SessionVariable.PARALLEL_PIPELINE_TASK_NUM);
+            try {
+                setValue(ctx.getObj(), ctx.getField(), 
String.valueOf(instance));
+            } catch (DdlException e) {
+                LOG.warn("failed to set global variable: {}", 
SessionVariable.PARALLEL_PIPELINE_TASK_NUM, e);
+                return;
+            }
+
+            // write edit log
+            GlobalVarPersistInfo info = new 
GlobalVarPersistInfo(defaultSessionVariable,
+                    
Lists.newArrayList(SessionVariable.PARALLEL_PIPELINE_TASK_NUM));
+            Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info);
+        } finally {
+            wlock.unlock();
+        }
+    }
+
     public static void setLowerCaseTableNames(int mode) throws DdlException {
         VarContext ctx = 
ctxByVarName.get(GlobalVariable.LOWER_CASE_TABLE_NAMES);
         setGlobalVarAndWriteEditLog(ctx, 
GlobalVariable.LOWER_CASE_TABLE_NAMES, "" + mode);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to