This is an automated email from the ASF dual-hosted git repository. lide pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push: new caf61f8a21d [fix](http) throw RejectedExecutionException to prevent http hanging by Future (#29651) caf61f8a21d is described below commit caf61f8a21d9c32a383d28406c8a377437a59475 Author: xueweizhang <zxw520bl...@163.com> AuthorDate: Mon Jan 8 11:17:49 2024 +0800 [fix](http) throw RejectedExecutionException to prevent http hanging by Future (#29651) --- docs/en/docs/admin-manual/config/fe-config.md | 12 ++++++++++ docs/zh-CN/docs/admin-manual/config/fe-config.md | 12 ++++++++++ .../main/java/org/apache/doris/common/Config.java | 6 +++++ .../org/apache/doris/common/ThreadPoolManager.java | 28 ++++++++++++++++++++-- .../apache/doris/httpv2/util/LoadSubmitter.java | 4 +++- .../doris/httpv2/util/StatementSubmitter.java | 3 ++- 6 files changed, 61 insertions(+), 4 deletions(-) diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md index 91daa268db1..482ab26363e 100644 --- a/docs/en/docs/admin-manual/config/fe-config.md +++ b/docs/en/docs/admin-manual/config/fe-config.md @@ -705,6 +705,18 @@ trace export to zipkin like: `http://127.0.0.1:9411/api/v2/spans` trace export to collector like: `http://127.0.0.1:4318/v1/traces` +#### `http_sql_submitter_max_worker_threads` + +Default:2 + +The max number work threads of http sql submitter + +#### `http_load_submitter_max_worker_threads` + +Default:2 + +The max number work threads of http upload submitter + ### Query Engine #### `default_max_query_instances` diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index c0f04e0231a..23a15336506 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -705,6 +705,18 @@ trace导出到 zipkin: `http://127.0.0.1:9411/api/v2/spans` trace导出到 collector: `http://127.0.0.1:4318/v1/traces` +#### `http_sql_submitter_max_worker_threads` + +默认值:2 + +http请求处理/api/query中sql任务的最大线程池 + +#### `http_load_submitter_max_worker_threads` + +默认值:2 + +http请求处理/api/upload任务的最大线程池 + ### 查询引擎 #### `default_max_query_instances` diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 0bf8da05c27..8bfe39ea990 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2051,5 +2051,11 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int restore_download_task_num_per_be = 3; + + @ConfField(mutable = false, masterOnly = false) + public static int http_sql_submitter_max_worker_threads = 2; + + @ConfField(mutable = false, masterOnly = false) + public static int http_load_submitter_max_worker_threads = 2; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java index 8d1b1e7da7b..0f242d422f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -113,6 +113,13 @@ public class ThreadPoolManager { new LogDiscardPolicy(poolName), poolName, needRegisterMetric); } + public static ThreadPoolExecutor newDaemonCacheThreadPoolThrowException(int maxNumThread, + String poolName, boolean needRegisterMetric) { + return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME, + TimeUnit.SECONDS, new SynchronousQueue(), + new LogDiscardPolicyThrowException(poolName), poolName, needRegisterMetric); + } + public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, String poolName, boolean needRegisterMetric) { return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, @@ -172,8 +179,8 @@ public class ThreadPoolManager { private static final Logger LOG = LogManager.getLogger(LogDiscardPolicy.class); - private String threadPoolName; - private AtomicLong rejectedNum; + public String threadPoolName; + public AtomicLong rejectedNum; public LogDiscardPolicy(String threadPoolName) { this.threadPoolName = threadPoolName; @@ -187,6 +194,23 @@ public class ThreadPoolManager { } } + static class LogDiscardPolicyThrowException extends LogDiscardPolicy { + + private static final Logger LOG = LogManager.getLogger(LogDiscardPolicyThrowException.class); + + public LogDiscardPolicyThrowException(String threadPoolName) { + super(threadPoolName); + } + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + LOG.warn("Task " + r.toString() + " rejected from " + threadPoolName + " " + executor.toString()); + this.rejectedNum.incrementAndGet(); + throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + + threadPoolName + " " + executor.toString()); + } + } + /** * A handler for rejected task that try to be blocked until the pool enqueue task succeed or timeout, * used for fixed thread pool diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java index e971c3bc077..63706af892d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java @@ -19,6 +19,7 @@ package org.apache.doris.httpv2.util; import org.apache.doris.catalog.Env; import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.Config; import org.apache.doris.common.LoadException; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.httpv2.rest.UploadAction; @@ -52,7 +53,8 @@ import java.util.concurrent.ThreadPoolExecutor; public class LoadSubmitter { private static final Logger LOG = LogManager.getLogger(LoadSubmitter.class); - private ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPool(2, "load-submitter", true); + private ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPoolThrowException( + Config.http_load_submitter_max_worker_threads, "load-submitter", true); private static final String STREAM_LOAD_URL_PATTERN = "http://%s:%d/api/%s/%s/_stream_load"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java index 5817f1d252e..bfc89324f2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java @@ -73,7 +73,8 @@ public class StatementSubmitter { private static final String JDBC_DRIVER = "org.mariadb.jdbc.Driver"; private static final String DB_URL_PATTERN = "jdbc:mariadb://127.0.0.1:%d/%s"; - private ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPool(2, "SQL submitter", true); + private ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPoolThrowException( + Config.http_sql_submitter_max_worker_threads, "SQL submitter", true); public Future<ExecutionResultSet> submit(StmtContext queryCtx) { Worker worker = new Worker(ConnectContext.get(), queryCtx); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org