wangbo commented on code in PR #31376: URL: https://github.com/apache/doris/pull/31376#discussion_r1501763845
########## be/src/runtime/task_group/task_group.cpp: ########## @@ -266,8 +273,167 @@ Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo& workload_group_ task_group_info->scan_thread_num = workload_group_info.scan_thread_num; } + // 10 max remote scan thread num + task_group_info->max_remote_scan_thread_num = config::doris_scanner_thread_pool_thread_num; + if (workload_group_info.__isset.max_remote_scan_thread_num && + workload_group_info.max_remote_scan_thread_num > 0) { + task_group_info->max_remote_scan_thread_num = + workload_group_info.max_remote_scan_thread_num; + } + + // 11 min remote scan thread num + task_group_info->min_remote_scan_thread_num = config::doris_scanner_thread_pool_thread_num; + if (workload_group_info.__isset.min_remote_scan_thread_num && + workload_group_info.min_remote_scan_thread_num > 0) { + task_group_info->min_remote_scan_thread_num = + workload_group_info.min_remote_scan_thread_num; + } + return Status::OK(); } +void TaskGroup::upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* exec_env) { + uint64_t tg_id = tg_info->id; + std::string tg_name = tg_info->name; + int cpu_hard_limit = tg_info->cpu_hard_limit; + uint64_t cpu_shares = tg_info->cpu_share; + bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit; + int scan_thread_num = tg_info->scan_thread_num; + int max_remote_scan_thread_num = tg_info->max_remote_scan_thread_num; + int min_remote_scan_thread_num = tg_info->min_remote_scan_thread_num; + + std::lock_guard<std::shared_mutex> wlock(_task_sched_lock); + if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) { + std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = std::make_unique<CgroupV1CpuCtl>(tg_id); + Status ret = cgroup_cpu_ctl->init(); + if (ret.ok()) { + _cgroup_cpu_ctl = std::move(cgroup_cpu_ctl); + LOG(INFO) << "[upsert wg thread pool] cgroup init success"; + } else { + LOG(INFO) << "[upsert wg thread pool] cgroup init failed, gid= " << tg_id + << ", reason=" << ret.to_string(); + } + } + + CgroupCpuCtl* cg_cpu_ctl_ptr = _cgroup_cpu_ctl.get(); + + if (_task_sched == nullptr) { + int32_t executors_size = config::pipeline_executor_size; + if (executors_size <= 0) { + executors_size = CpuInfo::num_cores(); + } + auto task_queue = std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size); + std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler = + std::make_unique<pipeline::TaskScheduler>( + exec_env, exec_env->get_global_block_scheduler(), std::move(task_queue), + "Exec_" + tg_name, cg_cpu_ctl_ptr); + Status ret = pipeline_task_scheduler->start(); + if (ret.ok()) { + _task_sched = std::move(pipeline_task_scheduler); + } else { + LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, gid= " << tg_id; + } + } + + if (_scan_task_sched == nullptr) { + std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler = + std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_" + tg_name, + cg_cpu_ctl_ptr); + Status ret = scan_scheduler->start(); + if (ret.ok()) { + _scan_task_sched = std::move(scan_scheduler); + } else { + LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, gid=" << tg_id; + } + } + if (scan_thread_num > 0 && _scan_task_sched) { + _scan_task_sched->reset_thread_num(scan_thread_num); + } + + if (_remote_scan_task_sched == nullptr) { + std::unique_ptr<vectorized::SimplifiedScanScheduler> remote_scan_scheduler = + std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" + tg_name, + cg_cpu_ctl_ptr); + Status ret = remote_scan_scheduler->start(); + if (ret.ok()) { + _remote_scan_task_sched = std::move(remote_scan_scheduler); + } else { + LOG(INFO) << "[upsert wg thread pool] remote scan scheduler start failed, gid=" + << tg_id; + } + } + if (max_remote_scan_thread_num > 0 && _remote_scan_task_sched) { + _remote_scan_task_sched->reset_max_thread_num(max_remote_scan_thread_num); + } + if (min_remote_scan_thread_num > 0 && _remote_scan_task_sched) { + _remote_scan_task_sched->reset_min_thread_num(min_remote_scan_thread_num); + } + + if (_non_pipe_thread_pool == nullptr) { + std::unique_ptr<ThreadPool> thread_pool = nullptr; + auto ret = ThreadPoolBuilder("nonPip_" + tg_name) + .set_min_threads(1) + .set_max_threads(config::fragment_pool_thread_num_max) + .set_max_queue_size(config::fragment_pool_queue_size) + .set_cgroup_cpu_ctl(cg_cpu_ctl_ptr) + .build(&thread_pool); + if (!ret.ok()) { + LOG(INFO) << "[upsert wg thread pool] create non-pipline thread pool failed, gid=" + << tg_id; + } else { + _non_pipe_thread_pool = std::move(thread_pool); + } + } + + // step 6: update cgroup cpu if needed + if (_cgroup_cpu_ctl) { + if (enable_cpu_hard_limit) { + if (cpu_hard_limit > 0) { + _cgroup_cpu_ctl->update_cpu_hard_limit(cpu_hard_limit); + _cgroup_cpu_ctl->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE); + } else { + LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is illegal: " + << cpu_hard_limit << ", gid=" << tg_id; + } + } else { + if (config::enable_cgroup_cpu_soft_limit) { + _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares); + _cgroup_cpu_ctl->update_cpu_hard_limit( + CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit + } + } + _cgroup_cpu_ctl->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares), + &(tg_info->cgroup_cpu_hard_limit)); + } +} + +void TaskGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched, + vectorized::SimplifiedScanScheduler** scan_sched, + ThreadPool** non_pipe_thread_pool, + vectorized::SimplifiedScanScheduler** remote_scan_sched) { + std::shared_lock<std::shared_mutex> rlock(_task_sched_lock); + *exec_sched = _task_sched.get(); Review Comment: the logic ```return global sched``` is in get_pipe_exec_scheduler/get_non_pipe_exec_thread_pool -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org