This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2dfe8cf3362c1437ef182eec2ee0b666374eac23
Author: wangbo <wan...@apache.org>
AuthorDate: Sat Apr 13 14:24:49 2024 +0800

    [Fix](executor)reset remote scan thread num #33579
---
 be/src/runtime/workload_group/workload_group.cpp | 15 +++++++++++++--
 be/src/vec/exec/scan/scanner_scheduler.h         |  7 ++++---
 2 files changed, 17 insertions(+), 5 deletions(-)

diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 19ed39a6ab7..1b15e89b08e 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -374,7 +374,9 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
         std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler =
                 std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_" 
+ tg_name,
                                                                       
cg_cpu_ctl_ptr);
-        Status ret = scan_scheduler->start();
+        Status ret = 
scan_scheduler->start(config::doris_scanner_thread_pool_thread_num,
+                                           
config::doris_scanner_thread_pool_thread_num,
+                                           
config::doris_scanner_thread_pool_queue_size);
         if (ret.ok()) {
             _scan_task_sched = std::move(scan_scheduler);
         } else {
@@ -386,10 +388,19 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
     }
 
     if (_remote_scan_task_sched == nullptr) {
+        int remote_max_thread_num =
+                config::doris_max_remote_scanner_thread_pool_thread_num != -1
+                        ? 
config::doris_max_remote_scanner_thread_pool_thread_num
+                        : std::max(512, CpuInfo::num_cores() * 10);
+        remote_max_thread_num =
+                std::max(remote_max_thread_num, 
config::doris_scanner_thread_pool_thread_num);
+
         std::unique_ptr<vectorized::SimplifiedScanScheduler> 
remote_scan_scheduler =
                 std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" 
+ tg_name,
                                                                       
cg_cpu_ctl_ptr);
-        Status ret = remote_scan_scheduler->start();
+        Status ret =
+                remote_scan_scheduler->start(remote_max_thread_num, 
remote_max_thread_num,
+                                             
config::doris_remote_scanner_thread_pool_queue_size);
         if (ret.ok()) {
             _remote_scan_task_sched = std::move(remote_scan_scheduler);
         } else {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index f3f9caaa4d3..01e08596434 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -118,10 +118,11 @@ public:
         _scan_thread_pool->wait();
     }
 
-    Status start() {
+    Status start(int max_thread_num, int min_thread_num, int queue_size) {
         RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name)
-                                
.set_min_threads(config::doris_scanner_thread_pool_thread_num)
-                                
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
+                                .set_min_threads(min_thread_num)
+                                .set_max_threads(max_thread_num)
+                                .set_max_queue_size(queue_size)
                                 .set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
                                 .build(&_scan_thread_pool));
         return Status::OK();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to