yiguolei commented on code in PR #31376:
URL: https://github.com/apache/doris/pull/31376#discussion_r1501755687


##########
be/src/runtime/task_group/task_group.cpp:
##########
@@ -266,8 +273,167 @@ Status TaskGroupInfo::parse_topic_info(const 
TWorkloadGroupInfo& workload_group_
         task_group_info->scan_thread_num = workload_group_info.scan_thread_num;
     }
 
+    // 10 max remote scan thread num
+    task_group_info->max_remote_scan_thread_num = 
config::doris_scanner_thread_pool_thread_num;
+    if (workload_group_info.__isset.max_remote_scan_thread_num &&
+        workload_group_info.max_remote_scan_thread_num > 0) {
+        task_group_info->max_remote_scan_thread_num =
+                workload_group_info.max_remote_scan_thread_num;
+    }
+
+    // 11 min remote scan thread num
+    task_group_info->min_remote_scan_thread_num = 
config::doris_scanner_thread_pool_thread_num;
+    if (workload_group_info.__isset.min_remote_scan_thread_num &&
+        workload_group_info.min_remote_scan_thread_num > 0) {
+        task_group_info->min_remote_scan_thread_num =
+                workload_group_info.min_remote_scan_thread_num;
+    }
+
     return Status::OK();
 }
 
+void TaskGroup::upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, 
ExecEnv* exec_env) {
+    uint64_t tg_id = tg_info->id;
+    std::string tg_name = tg_info->name;
+    int cpu_hard_limit = tg_info->cpu_hard_limit;
+    uint64_t cpu_shares = tg_info->cpu_share;
+    bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit;
+    int scan_thread_num = tg_info->scan_thread_num;
+    int max_remote_scan_thread_num = tg_info->max_remote_scan_thread_num;
+    int min_remote_scan_thread_num = tg_info->min_remote_scan_thread_num;
+
+    std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
+    if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) {
+        std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = 
std::make_unique<CgroupV1CpuCtl>(tg_id);
+        Status ret = cgroup_cpu_ctl->init();
+        if (ret.ok()) {
+            _cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
+            LOG(INFO) << "[upsert wg thread pool] cgroup init success";
+        } else {
+            LOG(INFO) << "[upsert wg thread pool] cgroup init failed, gid= " 
<< tg_id
+                      << ", reason=" << ret.to_string();
+        }
+    }
+
+    CgroupCpuCtl* cg_cpu_ctl_ptr = _cgroup_cpu_ctl.get();
+
+    if (_task_sched == nullptr) {
+        int32_t executors_size = config::pipeline_executor_size;
+        if (executors_size <= 0) {
+            executors_size = CpuInfo::num_cores();
+        }
+        auto task_queue = 
std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
+        std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
+                std::make_unique<pipeline::TaskScheduler>(
+                        exec_env, exec_env->get_global_block_scheduler(), 
std::move(task_queue),
+                        "Exec_" + tg_name, cg_cpu_ctl_ptr);

Review Comment:
   Pipeline_ instead of Exec_



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to