This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 4fbe855817 [feature](executor) using fe version to set instance_num (#22047) (#22352) 4fbe855817 is described below commit 4fbe855817a99038425fe36c2f840d87ab5752a9 Author: HappenLee <happen...@hotmail.com> AuthorDate: Fri Jul 28 21:14:15 2023 +0800 [feature](executor) using fe version to set instance_num (#22047) (#22352) --- .../pipeline-execution-engine.md | 1 + .../pipeline-execution-engine.md | 1 + .../main/java/org/apache/doris/catalog/Env.java | 75 +++++++++++++++++++++- .../java/org/apache/doris/qe/ConnectContext.java | 6 +- .../main/java/org/apache/doris/qe/VariableMgr.java | 19 ++++++ 5 files changed, 100 insertions(+), 2 deletions(-) diff --git a/docs/en/docs/query-acceleration/pipeline-execution-engine.md b/docs/en/docs/query-acceleration/pipeline-execution-engine.md index 5dbcecf2c5..d32ca42217 100644 --- a/docs/en/docs/query-acceleration/pipeline-execution-engine.md +++ b/docs/en/docs/query-acceleration/pipeline-execution-engine.md @@ -72,6 +72,7 @@ set enable_pipeline_engine = true; #### parallel_pipeline_task_num `parallel_pipeline_task_num` represents the concurrency of pipeline tasks of a query. Default value is `0` (e.g. half number of CPU cores). Users can adjust this value according to their own workloads. +If the user upgrades from a lower version, the default value will be the parallel_fragment_exec_instance_num before the upgrade. ``` set parallel_pipeline_task_num = 0; diff --git a/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md b/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md index 04fd788e3e..0ce0f65448 100644 --- a/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md +++ b/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md @@ -72,6 +72,7 @@ set enable_pipeline_engine = true; #### parallel_pipeline_task_num `parallel_pipeline_task_num`代表了 SQL 查询进行查询并发的 Pipeline Task 数目。Doris默认的配置为`0`,即CPU核数的一半。用户也可以实际根据自己的实际情况进行调整。 +如果用户从较低的版本升级过来,则默认值为升级前的`parallel_fragment_exec_instance_num`。 ``` set parallel_pipeline_task_num = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 52ecb88ea0..9f2797fe78 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -99,6 +99,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.ConfigBase; import org.apache.doris.common.ConfigException; import org.apache.doris.common.DdlException; +import org.apache.doris.common.EnvUtils; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; @@ -107,6 +108,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; +import org.apache.doris.common.Version; import org.apache.doris.common.io.CountingDataOutputStream; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.Daemon; @@ -289,10 +291,13 @@ public class Env { public static final String CLIENT_NODE_HOST_KEY = "CLIENT_NODE_HOST"; public static final String CLIENT_NODE_PORT_KEY = "CLIENT_NODE_PORT"; + private static final String VERSION_DIR = "/VERSION"; + private String latestFeVersion; + private String previousFeVersion; private String metaDir; private String bdbDir; private String imageDir; - + private String versionDir; private MetaContext metaContext; private long epoch = 0; @@ -851,6 +856,7 @@ public class Env { this.metaDir = Config.meta_dir; this.bdbDir = this.metaDir + BDB_DIR; this.imageDir = this.metaDir + IMAGE_DIR; + this.versionDir = EnvUtils.getDorisHome() + VERSION_DIR; // 0. get local node and helper node info getSelfHostPort(); @@ -870,12 +876,21 @@ public class Env { bdbDir.mkdirs(); } } + File imageDir = new File(this.imageDir); + if (!imageDir.exists()) { imageDir.mkdirs(); } + File verDir = new File(this.versionDir); + + if (!verDir.exists()) { + verDir.mkdirs(); + } + // init plugin manager + initVersionInfo(); pluginMgr.init(); auditEventProcessor.start(); @@ -5329,6 +5344,64 @@ public class Env { } } + public void writeVersionFile(String version, int seq) { + String versionName = versionDir + "/" + version + "-commitid-" + seq + "-version"; + File versionFile = new File(versionName); + try { + versionFile.createNewFile(); + } catch (Exception e) { + LOG.error(e.toString()); + } + } + + public boolean isMajorVersionUpgrade() { + if (previousFeVersion == null) { + // There are two possible scenarios when there is no 'previousFeVersion': + // If 'image' is empty, it indicates a completely new FE. + // If 'image' is not empty, it means an upgrade from a lower version. + File imageDir = new File(this.imageDir); + File[] files = imageDir.listFiles(); + if (files == null || files.length == 0) { + return false; + } + return true; + } + return previousFeVersion.charAt(0) != latestFeVersion.charAt(0); + } + + private void initVersionInfo() { + latestFeVersion = Version.DORIS_BUILD_VERSION_MAJOR + "_" + Version.DORIS_BUILD_VERSION_MINOR + "_" + + Version.DORIS_BUILD_VERSION_PATCH; + File folder = new File(versionDir); + File[] files = folder.listFiles(); + int previousSeq = 0; + if (files != null) { + // Every part meaning (2_0_0-commitid-1-version) + // [version] - [commitid] - [seq] + // 'VersionFile' can be transformed like this. + // 2_0_0-commitid-1-version -> 2_1_0-commitid-2-version -> + // 2_3_0-commitid-3-version -> 2_0_0-commitid-4-version + // You can observe the process of FE upgrades through these files. + for (File file : files) { + String[] splitArr = file.getName().split("-"); + String version = splitArr[0]; + int seq = Integer.parseInt(splitArr[2]); + if (seq > previousSeq) { + previousSeq = seq; + previousFeVersion = version; + } + } + } + if (previousFeVersion == null) { + writeVersionFile(latestFeVersion, 1); + } else if (!previousFeVersion.equals(latestFeVersion)) { + writeVersionFile(latestFeVersion, previousSeq + 1); + } + if (isMajorVersionUpgrade()) { + ConnectContext.isMajorVersionUpgrade = true; + } + } + public int getFollowerCount() { int count = 0; for (Frontend fe : frontends.values()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 3387b702ed..04a9953a69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -68,7 +68,7 @@ import java.util.Set; public class ConnectContext { private static final Logger LOG = LogManager.getLogger(ConnectContext.class); protected static ThreadLocal<ConnectContext> threadLocalInfo = new ThreadLocal<>(); - + public static boolean isMajorVersionUpgrade = false; private static final String SSL_PROTOCOL = "TLS"; // set this id before analyze @@ -263,6 +263,10 @@ public class ConnectContext { mysqlChannel = new DummyMysqlChannel(); } sessionVariable = VariableMgr.newSessionVariable(); + if (connection != null && isMajorVersionUpgrade) { + VariableMgr.setGlobalPipelineTask(sessionVariable.parallelExecInstanceNum); + sessionVariable = VariableMgr.newSessionVariable(); + } command = MysqlCommand.COM_SLEEP; if (Config.use_fuzzy_session_variable) { sessionVariable.initFuzzyModeVariables(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java index 24b7468e77..2d41eb821f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java @@ -367,6 +367,25 @@ public class VariableMgr { } } + public static void setGlobalPipelineTask(int instance) { + wlock.lock(); + try { + String name = "parallel_pipeline_task_num"; + String value = instance + ""; + VarContext ctx = ctxByVarName.get(name); + try { + setValue(ctx.getObj(), ctx.getField(), value); + } catch (DdlException e) { + LOG.error(e.toString()); + } + // write edit log + GlobalVarPersistInfo info = new GlobalVarPersistInfo(defaultSessionVariable, Lists.newArrayList(name)); + Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info); + } finally { + wlock.unlock(); + } + } + public static void setLowerCaseTableNames(int mode) throws DdlException { VarContext ctx = ctxByVarName.get(GlobalVariable.LOWER_CASE_TABLE_NAMES); setGlobalVarAndWriteEditLog(ctx, GlobalVariable.LOWER_CASE_TABLE_NAMES, "" + mode); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org