This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new a3fd3653aba [fix](http) throw RejectedExecutionException to prevent 
http hanging by Future (#29607) (#29679)
a3fd3653aba is described below

commit a3fd3653aba12f19664c03f45696a92cf814770a
Author: xueweizhang <zxw520bl...@163.com>
AuthorDate: Mon Jan 8 23:48:39 2024 +0800

    [fix](http) throw RejectedExecutionException to prevent http hanging by 
Future (#29607) (#29679)
---
 docs/en/docs/admin-manual/config/fe-config.md      | 12 ++++++++++
 docs/zh-CN/docs/admin-manual/config/fe-config.md   | 12 ++++++++++
 .../main/java/org/apache/doris/common/Config.java  | 12 ++++++++++
 .../org/apache/doris/common/ThreadPoolManager.java | 28 ++++++++++++++++++++--
 .../apache/doris/httpv2/util/LoadSubmitter.java    |  4 +++-
 .../doris/httpv2/util/StatementSubmitter.java      |  3 ++-
 6 files changed, 67 insertions(+), 4 deletions(-)

diff --git a/docs/en/docs/admin-manual/config/fe-config.md 
b/docs/en/docs/admin-manual/config/fe-config.md
index a34e3a56a62..4daa1516d8e 100644
--- a/docs/en/docs/admin-manual/config/fe-config.md
+++ b/docs/en/docs/admin-manual/config/fe-config.md
@@ -678,6 +678,18 @@ Default:1048576  (1M)
 
 http header size configuration parameter, the default value is 1M.
 
+#### `http_sql_submitter_max_worker_threads`
+
+Default:2
+
+The max number work threads of http sql submitter
+
+#### `http_load_submitter_max_worker_threads`
+
+Default:2
+
+The max number work threads of http upload submitter
+
 ### Query Engine
 
 #### `default_max_query_instances`
diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md 
b/docs/zh-CN/docs/admin-manual/config/fe-config.md
index f1ebb92935d..80e56ffec13 100644
--- a/docs/zh-CN/docs/admin-manual/config/fe-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md
@@ -678,6 +678,18 @@ workers 线程池默认不做设置,根据自己需要进行设置
 
 http header size 配置参数
 
+#### `http_sql_submitter_max_worker_threads`
+
+默认值:2
+
+http请求处理/api/query中sql任务的最大线程池
+
+#### `http_load_submitter_max_worker_threads`
+
+默认值:2
+
+http请求处理/api/upload任务的最大线程池
+
 ### 查询引擎
 
 #### `default_max_query_instances`
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 9bc9bb1be9f..025b6f91bf8 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2261,4 +2261,16 @@ public class Config extends ConfigBase {
     @ConfField(description = {"是否开启通过http接口获取log文件的功能",
             "Whether to enable the function of getting log files through http 
interface"})
     public static boolean enable_get_log_file_api = false;
+
+    @ConfField(mutable = false, masterOnly = false, description = {
+        "http请求处理/api/query中sql任务的最大线程池。",
+        "The max number work threads of http sql submitter."
+    })
+    public static int http_sql_submitter_max_worker_threads = 2;
+
+    @ConfField(mutable = false, masterOnly = false, description = {
+        "http请求处理/api/upload任务的最大线程池。",
+        "The max number work threads of http upload submitter."
+    })
+    public static int http_load_submitter_max_worker_threads = 2;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
index 31d608b8a25..0ad71112810 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
@@ -119,6 +119,13 @@ public class ThreadPoolManager {
                 new LogDiscardPolicy(poolName), poolName, needRegisterMetric);
     }
 
+    public static ThreadPoolExecutor 
newDaemonCacheThreadPoolThrowException(int maxNumThread,
+                                                              String poolName, 
boolean needRegisterMetric) {
+        return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME,
+            TimeUnit.SECONDS, new SynchronousQueue(),
+            new LogDiscardPolicyThrowException(poolName), poolName, 
needRegisterMetric);
+    }
+
     public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread,
             int queueSize, String poolName, boolean needRegisterMetric) {
         return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME, 
TimeUnit.SECONDS,
@@ -285,8 +292,8 @@ public class ThreadPoolManager {
 
         private static final Logger LOG = 
LogManager.getLogger(LogDiscardPolicy.class);
 
-        private String threadPoolName;
-        private AtomicLong rejectedNum;
+        public String threadPoolName;
+        public AtomicLong rejectedNum;
 
         public LogDiscardPolicy(String threadPoolName) {
             this.threadPoolName = threadPoolName;
@@ -300,6 +307,23 @@ public class ThreadPoolManager {
         }
     }
 
+    static class LogDiscardPolicyThrowException extends LogDiscardPolicy {
+
+        private static final Logger LOG = 
LogManager.getLogger(LogDiscardPolicyThrowException.class);
+
+        public LogDiscardPolicyThrowException(String threadPoolName) {
+            super(threadPoolName);
+        }
+
+        @Override
+        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 
{
+            LOG.warn("Task " + r.toString() + " rejected from " + 
threadPoolName + " " + executor.toString());
+            this.rejectedNum.incrementAndGet();
+            throw new RejectedExecutionException("Task " + r.toString() + " 
rejected from "
+                                                + threadPoolName + " " + 
executor.toString());
+        }
+    }
+
     /**
      * A handler for rejected task that try to be blocked until the pool 
enqueue task succeed or timeout,
      * used for fixed thread pool
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
index bf23138e0da..fc5f85743bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
@@ -19,6 +19,7 @@ package org.apache.doris.httpv2.util;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.util.NetUtils;
@@ -53,7 +54,8 @@ import java.util.concurrent.ThreadPoolExecutor;
 public class LoadSubmitter {
     private static final Logger LOG = 
LogManager.getLogger(LoadSubmitter.class);
 
-    private ThreadPoolExecutor executor = 
ThreadPoolManager.newDaemonCacheThreadPool(2, "load-submitter", true);
+    private ThreadPoolExecutor executor = 
ThreadPoolManager.newDaemonCacheThreadPoolThrowException(
+                        Config.http_load_submitter_max_worker_threads, 
"load-submitter", true);
 
     private static final String STREAM_LOAD_URL_PATTERN = 
"http://%s/api/%s/%s/_stream_load";;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java
index 5817f1d252e..bfc89324f2f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java
@@ -73,7 +73,8 @@ public class StatementSubmitter {
     private static final String JDBC_DRIVER = "org.mariadb.jdbc.Driver";
     private static final String DB_URL_PATTERN = 
"jdbc:mariadb://127.0.0.1:%d/%s";
 
-    private ThreadPoolExecutor executor = 
ThreadPoolManager.newDaemonCacheThreadPool(2, "SQL submitter", true);
+    private ThreadPoolExecutor executor = 
ThreadPoolManager.newDaemonCacheThreadPoolThrowException(
+                        Config.http_sql_submitter_max_worker_threads, "SQL 
submitter", true);
 
     public Future<ExecutionResultSet> submit(StmtContext queryCtx) {
         Worker worker = new Worker(ConnectContext.get(), queryCtx);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to