This is an automated email from the ASF dual-hosted git repository.

lide pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new caf61f8a21d [fix](http) throw RejectedExecutionException to prevent 
http hanging by Future (#29651)
caf61f8a21d is described below

commit caf61f8a21d9c32a383d28406c8a377437a59475
Author: xueweizhang <zxw520bl...@163.com>
AuthorDate: Mon Jan 8 11:17:49 2024 +0800

    [fix](http) throw RejectedExecutionException to prevent http hanging by 
Future (#29651)
---
 docs/en/docs/admin-manual/config/fe-config.md      | 12 ++++++++++
 docs/zh-CN/docs/admin-manual/config/fe-config.md   | 12 ++++++++++
 .../main/java/org/apache/doris/common/Config.java  |  6 +++++
 .../org/apache/doris/common/ThreadPoolManager.java | 28 ++++++++++++++++++++--
 .../apache/doris/httpv2/util/LoadSubmitter.java    |  4 +++-
 .../doris/httpv2/util/StatementSubmitter.java      |  3 ++-
 6 files changed, 61 insertions(+), 4 deletions(-)

diff --git a/docs/en/docs/admin-manual/config/fe-config.md 
b/docs/en/docs/admin-manual/config/fe-config.md
index 91daa268db1..482ab26363e 100644
--- a/docs/en/docs/admin-manual/config/fe-config.md
+++ b/docs/en/docs/admin-manual/config/fe-config.md
@@ -705,6 +705,18 @@ trace export to zipkin like: 
`http://127.0.0.1:9411/api/v2/spans`
 
 trace export to collector like: `http://127.0.0.1:4318/v1/traces`
 
+#### `http_sql_submitter_max_worker_threads`
+
+Default:2
+
+The max number work threads of http sql submitter
+
+#### `http_load_submitter_max_worker_threads`
+
+Default:2
+
+The max number work threads of http upload submitter
+
 ### Query Engine
 
 #### `default_max_query_instances`
diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md 
b/docs/zh-CN/docs/admin-manual/config/fe-config.md
index c0f04e0231a..23a15336506 100644
--- a/docs/zh-CN/docs/admin-manual/config/fe-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md
@@ -705,6 +705,18 @@ trace导出到 zipkin: `http://127.0.0.1:9411/api/v2/spans`
 
 trace导出到 collector: `http://127.0.0.1:4318/v1/traces`
 
+#### `http_sql_submitter_max_worker_threads`
+
+默认值:2
+
+http请求处理/api/query中sql任务的最大线程池
+
+#### `http_load_submitter_max_worker_threads`
+
+默认值:2
+
+http请求处理/api/upload任务的最大线程池
+
 ### 查询引擎
 
 #### `default_max_query_instances`
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 0bf8da05c27..8bfe39ea990 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2051,5 +2051,11 @@ public class Config extends ConfigBase {
 
     @ConfField(mutable = true, masterOnly = true)
     public static int restore_download_task_num_per_be = 3;
+
+    @ConfField(mutable = false, masterOnly = false)
+    public static int http_sql_submitter_max_worker_threads = 2;
+
+    @ConfField(mutable = false, masterOnly = false)
+    public static int http_load_submitter_max_worker_threads = 2;
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
index 8d1b1e7da7b..0f242d422f2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
@@ -113,6 +113,13 @@ public class ThreadPoolManager {
                 new LogDiscardPolicy(poolName), poolName, needRegisterMetric);
     }
 
+    public static ThreadPoolExecutor 
newDaemonCacheThreadPoolThrowException(int maxNumThread,
+                                                              String poolName, 
boolean needRegisterMetric) {
+        return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME,
+            TimeUnit.SECONDS, new SynchronousQueue(),
+            new LogDiscardPolicyThrowException(poolName), poolName, 
needRegisterMetric);
+    }
+
     public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread,
             int queueSize, String poolName, boolean needRegisterMetric) {
         return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME, 
TimeUnit.SECONDS,
@@ -172,8 +179,8 @@ public class ThreadPoolManager {
 
         private static final Logger LOG = 
LogManager.getLogger(LogDiscardPolicy.class);
 
-        private String threadPoolName;
-        private AtomicLong rejectedNum;
+        public String threadPoolName;
+        public AtomicLong rejectedNum;
 
         public LogDiscardPolicy(String threadPoolName) {
             this.threadPoolName = threadPoolName;
@@ -187,6 +194,23 @@ public class ThreadPoolManager {
         }
     }
 
+    static class LogDiscardPolicyThrowException extends LogDiscardPolicy {
+
+        private static final Logger LOG = 
LogManager.getLogger(LogDiscardPolicyThrowException.class);
+
+        public LogDiscardPolicyThrowException(String threadPoolName) {
+            super(threadPoolName);
+        }
+
+        @Override
+        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 
{
+            LOG.warn("Task " + r.toString() + " rejected from " + 
threadPoolName + " " + executor.toString());
+            this.rejectedNum.incrementAndGet();
+            throw new RejectedExecutionException("Task " + r.toString() + " 
rejected from "
+                                                + threadPoolName + " " + 
executor.toString());
+        }
+    }
+
     /**
      * A handler for rejected task that try to be blocked until the pool 
enqueue task succeed or timeout,
      * used for fixed thread pool
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
index e971c3bc077..63706af892d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
@@ -19,6 +19,7 @@ package org.apache.doris.httpv2.util;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.httpv2.rest.UploadAction;
@@ -52,7 +53,8 @@ import java.util.concurrent.ThreadPoolExecutor;
 public class LoadSubmitter {
     private static final Logger LOG = 
LogManager.getLogger(LoadSubmitter.class);
 
-    private ThreadPoolExecutor executor = 
ThreadPoolManager.newDaemonCacheThreadPool(2, "load-submitter", true);
+    private ThreadPoolExecutor executor = 
ThreadPoolManager.newDaemonCacheThreadPoolThrowException(
+                        Config.http_load_submitter_max_worker_threads, 
"load-submitter", true);
 
     private static final String STREAM_LOAD_URL_PATTERN = 
"http://%s:%d/api/%s/%s/_stream_load";;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java
index 5817f1d252e..bfc89324f2f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java
@@ -73,7 +73,8 @@ public class StatementSubmitter {
     private static final String JDBC_DRIVER = "org.mariadb.jdbc.Driver";
     private static final String DB_URL_PATTERN = 
"jdbc:mariadb://127.0.0.1:%d/%s";
 
-    private ThreadPoolExecutor executor = 
ThreadPoolManager.newDaemonCacheThreadPool(2, "SQL submitter", true);
+    private ThreadPoolExecutor executor = 
ThreadPoolManager.newDaemonCacheThreadPoolThrowException(
+                        Config.http_sql_submitter_max_worker_threads, "SQL 
submitter", true);
 
     public Future<ExecutionResultSet> submit(StmtContext queryCtx) {
         Worker worker = new Worker(ConnectContext.get(), queryCtx);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to