This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new cb0746dbab1 branch-4.0: [pipeline](conf) make blocking scheduler 
configurable #57354 (#57392)
cb0746dbab1 is described below

commit cb0746dbab1f651bb8fbdaddbb99abdbb9612629
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Oct 28 14:39:20 2025 +0800

    branch-4.0: [pipeline](conf) make blocking scheduler configurable #57354 
(#57392)
    
    Cherry-picked from #57354
    
    Co-authored-by: Gabriel <[email protected]>
---
 be/src/common/config.cpp                         | 1 +
 be/src/common/config.h                           | 1 +
 be/src/pipeline/task_scheduler.h                 | 7 ++++---
 be/src/runtime/workload_group/workload_group.cpp | 7 +++++++
 be/src/runtime/workload_group/workload_group.h   | 1 +
 5 files changed, 14 insertions(+), 3 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 65b245dabe6..5e7871814fb 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1091,6 +1091,7 @@ DEFINE_Bool(enable_graceful_exit_check, "false");
 DEFINE_Bool(enable_debug_points, "false");
 
 DEFINE_Int32(pipeline_executor_size, "0");
+DEFINE_Int32(blocking_pipeline_executor_size, "0");
 DEFINE_Bool(enable_workload_group_for_scan, "false");
 DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index d8aa651fc68..4d3b1a73420 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1122,6 +1122,7 @@ DECLARE_Bool(enable_graceful_exit_check);
 DECLARE_Bool(enable_debug_points);
 
 DECLARE_Int32(pipeline_executor_size);
+DECLARE_Int32(blocking_pipeline_executor_size);
 
 // block file cache
 DECLARE_Bool(enable_file_cache);
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 62b1b644c90..269116b5e4f 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -80,10 +80,11 @@ private:
 
 class HybridTaskScheduler MOCK_REMOVE(final) : public TaskScheduler {
 public:
-    HybridTaskScheduler(int core_num, std::string name,
+    HybridTaskScheduler(int exec_thread_num, int blocking_exec_thread_num, 
std::string name,
                         std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
-            : _blocking_scheduler(core_num * 2, name + "_blocking_scheduler", 
cgroup_cpu_ctl),
-              _simple_scheduler(core_num, name + "_simple_scheduler", 
cgroup_cpu_ctl) {}
+            : _blocking_scheduler(blocking_exec_thread_num, name + 
"_blocking_scheduler",
+                                  cgroup_cpu_ctl),
+              _simple_scheduler(exec_thread_num, name + "_simple_scheduler", 
cgroup_cpu_ctl) {}
 
     Status submit(PipelineTaskSPtr task) override;
 
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index b7bf59a80a3..bcb1daf818e 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -414,6 +414,10 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
     if (exec_thread_num <= 0) {
         exec_thread_num = CpuInfo::num_cores();
     }
+    int blocking_exec_thread_num = config::blocking_pipeline_executor_size;
+    if (blocking_exec_thread_num <= 0) {
+        blocking_exec_thread_num = CpuInfo::num_cores() * 2;
+    }
 
     int num_disk = 1;
     int num_cpus = 1;
@@ -484,6 +488,7 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
             .total_query_slot_count = total_query_slot_count,
             .slot_mem_policy = slot_mem_policy,
             .pipeline_exec_thread_num = exec_thread_num,
+            .blocking_pipeline_exec_thread_num = blocking_exec_thread_num,
             .max_flush_thread_num = max_flush_thread_num,
             .min_flush_thread_num = min_flush_thread_num};
 }
@@ -522,6 +527,7 @@ Status 
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
     uint64_t wg_id = wg_info->id;
     std::string wg_name = wg_info->name;
     int pipeline_exec_thread_num = wg_info->pipeline_exec_thread_num;
+    int blocking_exec_thread_num = wg_info->blocking_pipeline_exec_thread_num;
     int scan_thread_num = wg_info->scan_thread_num;
     int max_remote_scan_thread_num = wg_info->max_remote_scan_thread_num;
     int min_remote_scan_thread_num = wg_info->min_remote_scan_thread_num;
@@ -532,6 +538,7 @@ Status 
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
     if (_task_sched == nullptr) {
         std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
                 
std::make_unique<pipeline::HybridTaskScheduler>(pipeline_exec_thread_num,
+                                                                
blocking_exec_thread_num,
                                                                 "p_" + 
wg_name, cg_cpu_ctl_ptr);
         Status ret = pipeline_task_scheduler->start();
         if (ret.ok()) {
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 2677596b82b..cfdbaa8b4b8 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -285,6 +285,7 @@ struct WorkloadGroupInfo {
     int cgroup_cpu_hard_limit = 0;
     const bool valid = true;
     const int pipeline_exec_thread_num = 0;
+    const int blocking_pipeline_exec_thread_num = 0;
     const int max_flush_thread_num = 0;
     const int min_flush_thread_num = 0;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to