This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 2eb85bc55e9 [opt](routine-load) optimize routine load task thread pool 
and related param(#32282) (#37443)
2eb85bc55e9 is described below

commit 2eb85bc55e965ae7e071bcfef92dff33b4f236af
Author: hui lai <1353307...@qq.com>
AuthorDate: Tue Jul 9 10:48:51 2024 +0800

    [opt](routine-load) optimize routine load task thread pool and related 
param(#32282) (#37443)
    
    pick (#32282)
---
 be/src/common/config.cpp                           |  4 ++--
 be/src/common/config.h                             |  4 ++--
 be/src/runtime/exec_env_init.cpp                   |  1 +
 .../routine_load/routine_load_task_executor.cpp    | 25 +++++++++++++---------
 .../routine_load/routine_load_task_executor.h      | 10 ++++++---
 .../runtime/routine_load_task_executor_test.cpp    |  7 +++---
 .../main/java/org/apache/doris/common/Config.java  |  4 ++--
 7 files changed, 33 insertions(+), 22 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 7fc4bc9c5d6..ee9201bf6b5 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -604,9 +604,9 @@ DEFINE_Bool(enable_metric_calculator, "true");
 // max consumer num in one data consumer group, for routine load
 DEFINE_mInt32(max_consumer_num_per_group, "3");
 
-// the size of thread pool for routine load task.
+// the max size of thread pool for routine load task.
 // this should be larger than FE config 'max_routine_load_task_num_per_be' 
(default 5)
-DEFINE_Int32(routine_load_thread_pool_size, "10");
+DEFINE_Int32(max_routine_load_thread_pool_size, "1024");
 
 // max external scan cache batch count, means cache 
max_memory_cache_batch_count * batch_size row
 // default is 20, batch_size's default value is 1024 means 20 * 1024 rows will 
be cached
diff --git a/be/src/common/config.h b/be/src/common/config.h
index c762df19f53..dc750fc511f 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -655,9 +655,9 @@ DECLARE_Bool(enable_metric_calculator);
 // max consumer num in one data consumer group, for routine load
 DECLARE_mInt32(max_consumer_num_per_group);
 
-// the size of thread pool for routine load task.
+// the max size of thread pool for routine load task.
 // this should be larger than FE config 'max_routine_load_task_num_per_be' 
(default 5)
-DECLARE_Int32(routine_load_thread_pool_size);
+DECLARE_Int32(max_routine_load_thread_pool_size);
 
 // max external scan cache batch count, means cache 
max_memory_cache_batch_count * batch_size row
 // default is 20, batch_size's default value is 1024 means 20 * 1024 rows will 
be cached
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 48f1fc0651b..560466f89e2 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -166,6 +166,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
     _function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
     _stream_load_executor = StreamLoadExecutor::create_shared(this);
     _routine_load_task_executor = new RoutineLoadTaskExecutor(this);
+    RETURN_IF_ERROR(_routine_load_task_executor->init());
     _small_file_mgr = new SmallFileMgr(this, config::small_file_dir);
     _block_spill_mgr = new BlockSpillManager(_store_paths);
     _file_meta_cache = new 
FileMetaCache(config::max_external_file_meta_cache_num);
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index ebf7f256e5b..0d59f27ef4b 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -63,10 +63,7 @@ using namespace ErrorCode;
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(routine_load_task_count, 
MetricUnit::NOUNIT);
 
 RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* exec_env)
-        : _exec_env(exec_env),
-          _thread_pool(config::routine_load_thread_pool_size, 
config::routine_load_thread_pool_size,
-                       "routine_load"),
-          _data_consumer_pool(config::routine_load_consumer_pool_size) {
+        : _exec_env(exec_env), 
_data_consumer_pool(config::routine_load_consumer_pool_size) {
     REGISTER_HOOK_METRIC(routine_load_task_count, [this]() {
         // std::lock_guard<std::mutex> l(_lock);
         return _task_map.size();
@@ -77,13 +74,21 @@ RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* 
exec_env)
 
 RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() {
     DEREGISTER_HOOK_METRIC(routine_load_task_count);
-    _thread_pool.shutdown();
-    _thread_pool.join();
-
+    if (_thread_pool) {
+        _thread_pool->shutdown();
+    }
     LOG(INFO) << _task_map.size() << " not executed tasks left, cleanup";
     _task_map.clear();
 }
 
+Status RoutineLoadTaskExecutor::init() {
+    return ThreadPoolBuilder("routine_load")
+            .set_min_threads(0)
+            .set_max_threads(config::max_routine_load_thread_pool_size)
+            .set_max_queue_size(config::max_routine_load_thread_pool_size)
+            .build(&_thread_pool);
+}
+
 // Create a temp StreamLoadContext and set some kafka connection info in it.
 // So that we can use this ctx to get kafka data consumer instance.
 Status RoutineLoadTaskExecutor::_prepare_ctx(const PKafkaMetaProxyRequest& 
request,
@@ -180,10 +185,10 @@ Status RoutineLoadTaskExecutor::submit_task(const 
TRoutineLoadTask& task) {
         return Status::OK();
     }
 
-    if (_task_map.size() >= config::routine_load_thread_pool_size) {
+    if (_task_map.size() >= config::max_routine_load_thread_pool_size) {
         LOG(INFO) << "too many tasks in thread pool. reject task: " << 
UniqueId(task.id)
                   << ", job id: " << task.job_id
-                  << ", queue size: " << _thread_pool.get_queue_size()
+                  << ", queue size: " << _thread_pool->get_queue_size()
                   << ", current tasks num: " << _task_map.size();
         return Status::TooManyTasks("{}_{}", UniqueId(task.id).to_string(),
                                     BackendOptions::get_localhost());
@@ -250,7 +255,7 @@ Status RoutineLoadTaskExecutor::submit_task(const 
TRoutineLoadTask& task) {
     _task_map[ctx->id] = ctx;
 
     // offer the task to thread pool
-    if (!_thread_pool.offer(std::bind<void>(
+    if (!_thread_pool->submit_func(std::bind<void>(
                 &RoutineLoadTaskExecutor::exec_task, this, ctx, 
&_data_consumer_pool,
                 [this](std::shared_ptr<StreamLoadContext> ctx) {
                     std::unique_lock<std::mutex> l(_lock);
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h 
b/be/src/runtime/routine_load/routine_load_task_executor.h
index cc10f4a3bff..7ae29d60380 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -27,8 +27,8 @@
 #include <vector>
 
 #include "runtime/routine_load/data_consumer_pool.h"
+#include "util/threadpool.h"
 #include "util/uid_util.h"
-#include "util/work_thread_pool.hpp"
 
 namespace doris {
 
@@ -51,6 +51,10 @@ public:
 
     ~RoutineLoadTaskExecutor();
 
+    Status init();
+
+    void stop();
+
     // submit a routine load task
     Status submit_task(const TRoutineLoadTask& task);
 
@@ -80,8 +84,8 @@ private:
                         std::shared_ptr<StreamLoadContext> ctx);
 
 private:
-    ExecEnv* _exec_env;
-    PriorityThreadPool _thread_pool;
+    ExecEnv* _exec_env = nullptr;
+    std::unique_ptr<ThreadPool> _thread_pool;
     DataConsumerPool _data_consumer_pool;
 
     std::mutex _lock;
diff --git a/be/test/runtime/routine_load_task_executor_test.cpp 
b/be/test/runtime/routine_load_task_executor_test.cpp
index f1088300cdd..c8bdb37ab6b 100644
--- a/be/test/runtime/routine_load_task_executor_test.cpp
+++ b/be/test/runtime/routine_load_task_executor_test.cpp
@@ -59,7 +59,7 @@ public:
         _env.set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
         
_env.set_stream_load_executor(StreamLoadExecutor::create_unique(&_env));
 
-        config::routine_load_thread_pool_size = 5;
+        config::max_routine_load_thread_pool_size = 1024;
         config::max_consumer_num_per_group = 3;
     }
 
@@ -93,9 +93,10 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) {
     task.__set_kafka_load_info(k_info);
 
     RoutineLoadTaskExecutor executor(&_env);
-
-    // submit task
     Status st;
+    st = executor.init();
+    EXPECT_TRUE(st.ok());
+    // submit task
     st = executor.submit_task(task);
     EXPECT_TRUE(st.ok());
 
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 3d0f11353d3..90689d7922b 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1135,8 +1135,8 @@ public class Config extends ConfigBase {
     /**
      * the max concurrent routine load task num per BE.
      * This is to limit the num of routine load tasks sending to a BE, and it 
should also less
-     * than BE config 'routine_load_thread_pool_size'(default 10),
-     * which is the routine load task thread pool size on BE.
+     * than BE config 'max_routine_load_thread_pool_size'(default 1024),
+     * which is the routine load task thread pool max size on BE.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int max_routine_load_task_num_per_be = 1024;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to