This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 4fbe855817 [feature](executor) using fe version to set instance_num 
(#22047) (#22352)
4fbe855817 is described below

commit 4fbe855817a99038425fe36c2f840d87ab5752a9
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Fri Jul 28 21:14:15 2023 +0800

    [feature](executor) using fe version to set instance_num (#22047) (#22352)
---
 .../pipeline-execution-engine.md                   |  1 +
 .../pipeline-execution-engine.md                   |  1 +
 .../main/java/org/apache/doris/catalog/Env.java    | 75 +++++++++++++++++++++-
 .../java/org/apache/doris/qe/ConnectContext.java   |  6 +-
 .../main/java/org/apache/doris/qe/VariableMgr.java | 19 ++++++
 5 files changed, 100 insertions(+), 2 deletions(-)

diff --git a/docs/en/docs/query-acceleration/pipeline-execution-engine.md 
b/docs/en/docs/query-acceleration/pipeline-execution-engine.md
index 5dbcecf2c5..d32ca42217 100644
--- a/docs/en/docs/query-acceleration/pipeline-execution-engine.md
+++ b/docs/en/docs/query-acceleration/pipeline-execution-engine.md
@@ -72,6 +72,7 @@ set enable_pipeline_engine = true;
 #### parallel_pipeline_task_num
 
 `parallel_pipeline_task_num` represents the concurrency of pipeline tasks of a 
query. Default value is `0` (e.g. half number of CPU cores). Users can adjust 
this value according to their own workloads.
+If the user upgrades from a lower version, the default value will be the 
parallel_fragment_exec_instance_num before the upgrade.
 
 ```
 set parallel_pipeline_task_num = 0;
diff --git a/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md 
b/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md
index 04fd788e3e..0ce0f65448 100644
--- a/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md
+++ b/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md
@@ -72,6 +72,7 @@ set enable_pipeline_engine = true;
 #### parallel_pipeline_task_num
 
 `parallel_pipeline_task_num`代表了 SQL 查询进行查询并发的 Pipeline Task 
数目。Doris默认的配置为`0`,即CPU核数的一半。用户也可以实际根据自己的实际情况进行调整。
+如果用户从较低的版本升级过来,则默认值为升级前的`parallel_fragment_exec_instance_num`。
 
 ```
 set parallel_pipeline_task_num = 0;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 52ecb88ea0..9f2797fe78 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -99,6 +99,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.ConfigBase;
 import org.apache.doris.common.ConfigException;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.EnvUtils;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeConstants;
@@ -107,6 +108,7 @@ import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.Version;
 import org.apache.doris.common.io.CountingDataOutputStream;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.Daemon;
@@ -289,10 +291,13 @@ public class Env {
     public static final String CLIENT_NODE_HOST_KEY = "CLIENT_NODE_HOST";
     public static final String CLIENT_NODE_PORT_KEY = "CLIENT_NODE_PORT";
 
+    private static final String VERSION_DIR = "/VERSION";
+    private String latestFeVersion;
+    private String previousFeVersion;
     private String metaDir;
     private String bdbDir;
     private String imageDir;
-
+    private String versionDir;
     private MetaContext metaContext;
     private long epoch = 0;
 
@@ -851,6 +856,7 @@ public class Env {
         this.metaDir = Config.meta_dir;
         this.bdbDir = this.metaDir + BDB_DIR;
         this.imageDir = this.metaDir + IMAGE_DIR;
+        this.versionDir = EnvUtils.getDorisHome() + VERSION_DIR;
 
         // 0. get local node and helper node info
         getSelfHostPort();
@@ -870,12 +876,21 @@ public class Env {
                 bdbDir.mkdirs();
             }
         }
+
         File imageDir = new File(this.imageDir);
+
         if (!imageDir.exists()) {
             imageDir.mkdirs();
         }
 
+        File verDir = new File(this.versionDir);
+
+        if (!verDir.exists()) {
+            verDir.mkdirs();
+        }
+
         // init plugin manager
+        initVersionInfo();
         pluginMgr.init();
         auditEventProcessor.start();
 
@@ -5329,6 +5344,64 @@ public class Env {
         }
     }
 
+    public void writeVersionFile(String version, int seq) {
+        String versionName = versionDir + "/" + version + "-commitid-" + seq + 
"-version";
+        File versionFile = new File(versionName);
+        try {
+            versionFile.createNewFile();
+        } catch (Exception e) {
+            LOG.error(e.toString());
+        }
+    }
+
+    public boolean isMajorVersionUpgrade() {
+        if (previousFeVersion == null) {
+            // There are two possible scenarios when there is no 
'previousFeVersion':
+            // If 'image' is empty, it indicates a completely new FE.
+            // If 'image' is not empty, it means an upgrade from a lower 
version.
+            File imageDir = new File(this.imageDir);
+            File[] files = imageDir.listFiles();
+            if (files == null || files.length == 0) {
+                return false;
+            }
+            return true;
+        }
+        return previousFeVersion.charAt(0) != latestFeVersion.charAt(0);
+    }
+
+    private void initVersionInfo() {
+        latestFeVersion = Version.DORIS_BUILD_VERSION_MAJOR + "_" + 
Version.DORIS_BUILD_VERSION_MINOR + "_"
+                + Version.DORIS_BUILD_VERSION_PATCH;
+        File folder = new File(versionDir);
+        File[] files = folder.listFiles();
+        int previousSeq = 0;
+        if (files != null) {
+            // Every part meaning (2_0_0-commitid-1-version)
+            // [version] - [commitid] - [seq]
+            // 'VersionFile' can be transformed like this.
+            // 2_0_0-commitid-1-version -> 2_1_0-commitid-2-version ->
+            // 2_3_0-commitid-3-version -> 2_0_0-commitid-4-version
+            // You can observe the process of FE upgrades through these files.
+            for (File file : files) {
+                String[] splitArr = file.getName().split("-");
+                String version = splitArr[0];
+                int seq = Integer.parseInt(splitArr[2]);
+                if (seq > previousSeq) {
+                    previousSeq = seq;
+                    previousFeVersion = version;
+                }
+            }
+        }
+        if (previousFeVersion == null) {
+            writeVersionFile(latestFeVersion, 1);
+        } else if (!previousFeVersion.equals(latestFeVersion)) {
+            writeVersionFile(latestFeVersion, previousSeq + 1);
+        }
+        if (isMajorVersionUpgrade()) {
+            ConnectContext.isMajorVersionUpgrade = true;
+        }
+    }
+
     public int getFollowerCount() {
         int count = 0;
         for (Frontend fe : frontends.values()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 3387b702ed..04a9953a69 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -68,7 +68,7 @@ import java.util.Set;
 public class ConnectContext {
     private static final Logger LOG = 
LogManager.getLogger(ConnectContext.class);
     protected static ThreadLocal<ConnectContext> threadLocalInfo = new 
ThreadLocal<>();
-
+    public static boolean isMajorVersionUpgrade = false;
     private static final String SSL_PROTOCOL = "TLS";
 
     // set this id before analyze
@@ -263,6 +263,10 @@ public class ConnectContext {
             mysqlChannel = new DummyMysqlChannel();
         }
         sessionVariable = VariableMgr.newSessionVariable();
+        if (connection != null && isMajorVersionUpgrade) {
+            
VariableMgr.setGlobalPipelineTask(sessionVariable.parallelExecInstanceNum);
+            sessionVariable = VariableMgr.newSessionVariable();
+        }
         command = MysqlCommand.COM_SLEEP;
         if (Config.use_fuzzy_session_variable) {
             sessionVariable.initFuzzyModeVariables();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
index 24b7468e77..2d41eb821f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
@@ -367,6 +367,25 @@ public class VariableMgr {
         }
     }
 
+    public static void setGlobalPipelineTask(int instance) {
+        wlock.lock();
+        try {
+            String name = "parallel_pipeline_task_num";
+            String value = instance + "";
+            VarContext ctx = ctxByVarName.get(name);
+            try {
+                setValue(ctx.getObj(), ctx.getField(), value);
+            } catch (DdlException e) {
+                LOG.error(e.toString());
+            }
+            // write edit log
+            GlobalVarPersistInfo info = new 
GlobalVarPersistInfo(defaultSessionVariable, Lists.newArrayList(name));
+            Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info);
+        } finally {
+            wlock.unlock();
+        }
+    }
+
     public static void setLowerCaseTableNames(int mode) throws DdlException {
         VarContext ctx = 
ctxByVarName.get(GlobalVariable.LOWER_CASE_TABLE_NAMES);
         setGlobalVarAndWriteEditLog(ctx, 
GlobalVariable.LOWER_CASE_TABLE_NAMES, "" + mode);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to