This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 2eb85bc55e9 [opt](routine-load) optimize routine load task thread pool and related param(#32282) (#37443) 2eb85bc55e9 is described below commit 2eb85bc55e965ae7e071bcfef92dff33b4f236af Author: hui lai <1353307...@qq.com> AuthorDate: Tue Jul 9 10:48:51 2024 +0800 [opt](routine-load) optimize routine load task thread pool and related param(#32282) (#37443) pick (#32282) --- be/src/common/config.cpp | 4 ++-- be/src/common/config.h | 4 ++-- be/src/runtime/exec_env_init.cpp | 1 + .../routine_load/routine_load_task_executor.cpp | 25 +++++++++++++--------- .../routine_load/routine_load_task_executor.h | 10 ++++++--- .../runtime/routine_load_task_executor_test.cpp | 7 +++--- .../main/java/org/apache/doris/common/Config.java | 4 ++-- 7 files changed, 33 insertions(+), 22 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 7fc4bc9c5d6..ee9201bf6b5 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -604,9 +604,9 @@ DEFINE_Bool(enable_metric_calculator, "true"); // max consumer num in one data consumer group, for routine load DEFINE_mInt32(max_consumer_num_per_group, "3"); -// the size of thread pool for routine load task. +// the max size of thread pool for routine load task. // this should be larger than FE config 'max_routine_load_task_num_per_be' (default 5) -DEFINE_Int32(routine_load_thread_pool_size, "10"); +DEFINE_Int32(max_routine_load_thread_pool_size, "1024"); // max external scan cache batch count, means cache max_memory_cache_batch_count * batch_size row // default is 20, batch_size's default value is 1024 means 20 * 1024 rows will be cached diff --git a/be/src/common/config.h b/be/src/common/config.h index c762df19f53..dc750fc511f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -655,9 +655,9 @@ DECLARE_Bool(enable_metric_calculator); // max consumer num in one data consumer group, for routine load DECLARE_mInt32(max_consumer_num_per_group); -// the size of thread pool for routine load task. +// the max size of thread pool for routine load task. // this should be larger than FE config 'max_routine_load_task_num_per_be' (default 5) -DECLARE_Int32(routine_load_thread_pool_size); +DECLARE_Int32(max_routine_load_thread_pool_size); // max external scan cache batch count, means cache max_memory_cache_batch_count * batch_size row // default is 20, batch_size's default value is 1024 means 20 * 1024 rows will be cached diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 48f1fc0651b..560466f89e2 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -166,6 +166,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) { _function_client_cache = new BrpcClientCache<PFunctionService_Stub>(); _stream_load_executor = StreamLoadExecutor::create_shared(this); _routine_load_task_executor = new RoutineLoadTaskExecutor(this); + RETURN_IF_ERROR(_routine_load_task_executor->init()); _small_file_mgr = new SmallFileMgr(this, config::small_file_dir); _block_spill_mgr = new BlockSpillManager(_store_paths); _file_meta_cache = new FileMetaCache(config::max_external_file_meta_cache_num); diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index ebf7f256e5b..0d59f27ef4b 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -63,10 +63,7 @@ using namespace ErrorCode; DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(routine_load_task_count, MetricUnit::NOUNIT); RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* exec_env) - : _exec_env(exec_env), - _thread_pool(config::routine_load_thread_pool_size, config::routine_load_thread_pool_size, - "routine_load"), - _data_consumer_pool(config::routine_load_consumer_pool_size) { + : _exec_env(exec_env), _data_consumer_pool(config::routine_load_consumer_pool_size) { REGISTER_HOOK_METRIC(routine_load_task_count, [this]() { // std::lock_guard<std::mutex> l(_lock); return _task_map.size(); @@ -77,13 +74,21 @@ RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* exec_env) RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() { DEREGISTER_HOOK_METRIC(routine_load_task_count); - _thread_pool.shutdown(); - _thread_pool.join(); - + if (_thread_pool) { + _thread_pool->shutdown(); + } LOG(INFO) << _task_map.size() << " not executed tasks left, cleanup"; _task_map.clear(); } +Status RoutineLoadTaskExecutor::init() { + return ThreadPoolBuilder("routine_load") + .set_min_threads(0) + .set_max_threads(config::max_routine_load_thread_pool_size) + .set_max_queue_size(config::max_routine_load_thread_pool_size) + .build(&_thread_pool); +} + // Create a temp StreamLoadContext and set some kafka connection info in it. // So that we can use this ctx to get kafka data consumer instance. Status RoutineLoadTaskExecutor::_prepare_ctx(const PKafkaMetaProxyRequest& request, @@ -180,10 +185,10 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { return Status::OK(); } - if (_task_map.size() >= config::routine_load_thread_pool_size) { + if (_task_map.size() >= config::max_routine_load_thread_pool_size) { LOG(INFO) << "too many tasks in thread pool. reject task: " << UniqueId(task.id) << ", job id: " << task.job_id - << ", queue size: " << _thread_pool.get_queue_size() + << ", queue size: " << _thread_pool->get_queue_size() << ", current tasks num: " << _task_map.size(); return Status::TooManyTasks("{}_{}", UniqueId(task.id).to_string(), BackendOptions::get_localhost()); @@ -250,7 +255,7 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { _task_map[ctx->id] = ctx; // offer the task to thread pool - if (!_thread_pool.offer(std::bind<void>( + if (!_thread_pool->submit_func(std::bind<void>( &RoutineLoadTaskExecutor::exec_task, this, ctx, &_data_consumer_pool, [this](std::shared_ptr<StreamLoadContext> ctx) { std::unique_lock<std::mutex> l(_lock); diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index cc10f4a3bff..7ae29d60380 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -27,8 +27,8 @@ #include <vector> #include "runtime/routine_load/data_consumer_pool.h" +#include "util/threadpool.h" #include "util/uid_util.h" -#include "util/work_thread_pool.hpp" namespace doris { @@ -51,6 +51,10 @@ public: ~RoutineLoadTaskExecutor(); + Status init(); + + void stop(); + // submit a routine load task Status submit_task(const TRoutineLoadTask& task); @@ -80,8 +84,8 @@ private: std::shared_ptr<StreamLoadContext> ctx); private: - ExecEnv* _exec_env; - PriorityThreadPool _thread_pool; + ExecEnv* _exec_env = nullptr; + std::unique_ptr<ThreadPool> _thread_pool; DataConsumerPool _data_consumer_pool; std::mutex _lock; diff --git a/be/test/runtime/routine_load_task_executor_test.cpp b/be/test/runtime/routine_load_task_executor_test.cpp index f1088300cdd..c8bdb37ab6b 100644 --- a/be/test/runtime/routine_load_task_executor_test.cpp +++ b/be/test/runtime/routine_load_task_executor_test.cpp @@ -59,7 +59,7 @@ public: _env.set_new_load_stream_mgr(NewLoadStreamMgr::create_unique()); _env.set_stream_load_executor(StreamLoadExecutor::create_unique(&_env)); - config::routine_load_thread_pool_size = 5; + config::max_routine_load_thread_pool_size = 1024; config::max_consumer_num_per_group = 3; } @@ -93,9 +93,10 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) { task.__set_kafka_load_info(k_info); RoutineLoadTaskExecutor executor(&_env); - - // submit task Status st; + st = executor.init(); + EXPECT_TRUE(st.ok()); + // submit task st = executor.submit_task(task); EXPECT_TRUE(st.ok()); diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 3d0f11353d3..90689d7922b 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1135,8 +1135,8 @@ public class Config extends ConfigBase { /** * the max concurrent routine load task num per BE. * This is to limit the num of routine load tasks sending to a BE, and it should also less - * than BE config 'routine_load_thread_pool_size'(default 10), - * which is the routine load task thread pool size on BE. + * than BE config 'max_routine_load_thread_pool_size'(default 1024), + * which is the routine load task thread pool max size on BE. */ @ConfField(mutable = true, masterOnly = true) public static int max_routine_load_task_num_per_be = 1024; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org