yiguolei commented on code in PR #31376: URL: https://github.com/apache/doris/pull/31376#discussion_r1501755687
########## be/src/runtime/task_group/task_group.cpp: ########## @@ -266,8 +273,167 @@ Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo& workload_group_ task_group_info->scan_thread_num = workload_group_info.scan_thread_num; } + // 10 max remote scan thread num + task_group_info->max_remote_scan_thread_num = config::doris_scanner_thread_pool_thread_num; + if (workload_group_info.__isset.max_remote_scan_thread_num && + workload_group_info.max_remote_scan_thread_num > 0) { + task_group_info->max_remote_scan_thread_num = + workload_group_info.max_remote_scan_thread_num; + } + + // 11 min remote scan thread num + task_group_info->min_remote_scan_thread_num = config::doris_scanner_thread_pool_thread_num; + if (workload_group_info.__isset.min_remote_scan_thread_num && + workload_group_info.min_remote_scan_thread_num > 0) { + task_group_info->min_remote_scan_thread_num = + workload_group_info.min_remote_scan_thread_num; + } + return Status::OK(); } +void TaskGroup::upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* exec_env) { + uint64_t tg_id = tg_info->id; + std::string tg_name = tg_info->name; + int cpu_hard_limit = tg_info->cpu_hard_limit; + uint64_t cpu_shares = tg_info->cpu_share; + bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit; + int scan_thread_num = tg_info->scan_thread_num; + int max_remote_scan_thread_num = tg_info->max_remote_scan_thread_num; + int min_remote_scan_thread_num = tg_info->min_remote_scan_thread_num; + + std::lock_guard<std::shared_mutex> wlock(_task_sched_lock); + if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) { + std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = std::make_unique<CgroupV1CpuCtl>(tg_id); + Status ret = cgroup_cpu_ctl->init(); + if (ret.ok()) { + _cgroup_cpu_ctl = std::move(cgroup_cpu_ctl); + LOG(INFO) << "[upsert wg thread pool] cgroup init success"; + } else { + LOG(INFO) << "[upsert wg thread pool] cgroup init failed, gid= " << tg_id + << ", reason=" << ret.to_string(); + } + } + + CgroupCpuCtl* cg_cpu_ctl_ptr = _cgroup_cpu_ctl.get(); + + if (_task_sched == nullptr) { + int32_t executors_size = config::pipeline_executor_size; + if (executors_size <= 0) { + executors_size = CpuInfo::num_cores(); + } + auto task_queue = std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size); + std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler = + std::make_unique<pipeline::TaskScheduler>( + exec_env, exec_env->get_global_block_scheduler(), std::move(task_queue), + "Exec_" + tg_name, cg_cpu_ctl_ptr); Review Comment: Pipeline_ instead of Exec_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org