This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new a3fd3653aba [fix](http) throw RejectedExecutionException to prevent http hanging by Future (#29607) (#29679) a3fd3653aba is described below commit a3fd3653aba12f19664c03f45696a92cf814770a Author: xueweizhang <zxw520bl...@163.com> AuthorDate: Mon Jan 8 23:48:39 2024 +0800 [fix](http) throw RejectedExecutionException to prevent http hanging by Future (#29607) (#29679) --- docs/en/docs/admin-manual/config/fe-config.md | 12 ++++++++++ docs/zh-CN/docs/admin-manual/config/fe-config.md | 12 ++++++++++ .../main/java/org/apache/doris/common/Config.java | 12 ++++++++++ .../org/apache/doris/common/ThreadPoolManager.java | 28 ++++++++++++++++++++-- .../apache/doris/httpv2/util/LoadSubmitter.java | 4 +++- .../doris/httpv2/util/StatementSubmitter.java | 3 ++- 6 files changed, 67 insertions(+), 4 deletions(-) diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md index a34e3a56a62..4daa1516d8e 100644 --- a/docs/en/docs/admin-manual/config/fe-config.md +++ b/docs/en/docs/admin-manual/config/fe-config.md @@ -678,6 +678,18 @@ Default:1048576 (1M) http header size configuration parameter, the default value is 1M. +#### `http_sql_submitter_max_worker_threads` + +Default:2 + +The max number work threads of http sql submitter + +#### `http_load_submitter_max_worker_threads` + +Default:2 + +The max number work threads of http upload submitter + ### Query Engine #### `default_max_query_instances` diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index f1ebb92935d..80e56ffec13 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -678,6 +678,18 @@ workers 线程池默认不做设置,根据自己需要进行设置 http header size 配置参数 +#### `http_sql_submitter_max_worker_threads` + +默认值:2 + +http请求处理/api/query中sql任务的最大线程池 + +#### `http_load_submitter_max_worker_threads` + +默认值:2 + +http请求处理/api/upload任务的最大线程池 + ### 查询引擎 #### `default_max_query_instances` diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 9bc9bb1be9f..025b6f91bf8 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2261,4 +2261,16 @@ public class Config extends ConfigBase { @ConfField(description = {"是否开启通过http接口获取log文件的功能", "Whether to enable the function of getting log files through http interface"}) public static boolean enable_get_log_file_api = false; + + @ConfField(mutable = false, masterOnly = false, description = { + "http请求处理/api/query中sql任务的最大线程池。", + "The max number work threads of http sql submitter." + }) + public static int http_sql_submitter_max_worker_threads = 2; + + @ConfField(mutable = false, masterOnly = false, description = { + "http请求处理/api/upload任务的最大线程池。", + "The max number work threads of http upload submitter." + }) + public static int http_load_submitter_max_worker_threads = 2; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java index 31d608b8a25..0ad71112810 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -119,6 +119,13 @@ public class ThreadPoolManager { new LogDiscardPolicy(poolName), poolName, needRegisterMetric); } + public static ThreadPoolExecutor newDaemonCacheThreadPoolThrowException(int maxNumThread, + String poolName, boolean needRegisterMetric) { + return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME, + TimeUnit.SECONDS, new SynchronousQueue(), + new LogDiscardPolicyThrowException(poolName), poolName, needRegisterMetric); + } + public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, String poolName, boolean needRegisterMetric) { return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, @@ -285,8 +292,8 @@ public class ThreadPoolManager { private static final Logger LOG = LogManager.getLogger(LogDiscardPolicy.class); - private String threadPoolName; - private AtomicLong rejectedNum; + public String threadPoolName; + public AtomicLong rejectedNum; public LogDiscardPolicy(String threadPoolName) { this.threadPoolName = threadPoolName; @@ -300,6 +307,23 @@ public class ThreadPoolManager { } } + static class LogDiscardPolicyThrowException extends LogDiscardPolicy { + + private static final Logger LOG = LogManager.getLogger(LogDiscardPolicyThrowException.class); + + public LogDiscardPolicyThrowException(String threadPoolName) { + super(threadPoolName); + } + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + LOG.warn("Task " + r.toString() + " rejected from " + threadPoolName + " " + executor.toString()); + this.rejectedNum.incrementAndGet(); + throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + + threadPoolName + " " + executor.toString()); + } + } + /** * A handler for rejected task that try to be blocked until the pool enqueue task succeed or timeout, * used for fixed thread pool diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java index bf23138e0da..fc5f85743bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java @@ -19,6 +19,7 @@ package org.apache.doris.httpv2.util; import org.apache.doris.catalog.Env; import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.Config; import org.apache.doris.common.LoadException; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.util.NetUtils; @@ -53,7 +54,8 @@ import java.util.concurrent.ThreadPoolExecutor; public class LoadSubmitter { private static final Logger LOG = LogManager.getLogger(LoadSubmitter.class); - private ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPool(2, "load-submitter", true); + private ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPoolThrowException( + Config.http_load_submitter_max_worker_threads, "load-submitter", true); private static final String STREAM_LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java index 5817f1d252e..bfc89324f2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java @@ -73,7 +73,8 @@ public class StatementSubmitter { private static final String JDBC_DRIVER = "org.mariadb.jdbc.Driver"; private static final String DB_URL_PATTERN = "jdbc:mariadb://127.0.0.1:%d/%s"; - private ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPool(2, "SQL submitter", true); + private ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPoolThrowException( + Config.http_sql_submitter_max_worker_threads, "SQL submitter", true); public Future<ExecutionResultSet> submit(StmtContext queryCtx) { Worker worker = new Worker(ConnectContext.get(), queryCtx); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org