wangbo commented on code in PR #19526: URL: https://github.com/apache/doris/pull/19526#discussion_r1192956627
########## be/src/runtime/task_group/task_group.cpp: ########## @@ -63,36 +72,105 @@ std::string TaskGroupEntity::debug_string() const { cpu_share(), _queue.size(), _vruntime_ns); } -TaskGroup::TaskGroup(uint64_t id, std::string name, uint64_t cpu_share, int64_t version) - : _id(id), _name(name), _cpu_share(cpu_share), _task_entity(this), _version(version) {} +TaskGroup::TaskGroup(const TaskGroupInfo& tg_info) + : _id(tg_info.id), + _name(tg_info.name), + _cpu_share(tg_info.cpu_share), + _memory_limit(tg_info.memory_limit), + _version(tg_info.version), + _task_entity(this), + _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM) {} std::string TaskGroup::debug_string() const { - std::shared_lock<std::shared_mutex> rl {mutex}; - return fmt::format("TG[id = {}, name = {}, cpu_share = {}, version = {}]", _id, _name, - cpu_share(), _version); -} - -bool TaskGroup::check_version(int64_t version) const { - std::shared_lock<std::shared_mutex> rl {mutex}; - return version > _version; + std::shared_lock<std::shared_mutex> rl {_mutex}; + return fmt::format("TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, version = {}]", + _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES), + _version); } void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) { - if (tg_info._id != _id) { + if (UNLIKELY(tg_info.id != _id)) { return; } + { + std::shared_lock<std::shared_mutex> rl {_mutex}; + if (LIKELY(tg_info.version <= _version)) { + return; + } + } - std::lock_guard<std::shared_mutex> wl {mutex}; - if (tg_info._version > _version) { - _name = tg_info._name; - _cpu_share = tg_info._cpu_share; - _version = tg_info._version; + std::lock_guard<std::shared_mutex> wl {_mutex}; + if (tg_info.version > _version) { + _name = tg_info.name; + _version = tg_info.version; + _memory_limit = tg_info.memory_limit; + if (_cpu_share != tg_info.cpu_share) { + ExecEnv::GetInstance()->pipeline_task_group_scheduler()->update_tg_cpu_share( + tg_info, shared_from_this()); + } } } +void TaskGroup::update_cpu_share_unlock(const TaskGroupInfo& tg_info) { + _cpu_share = tg_info.cpu_share; +} + +std::list<MemTrackerLimiter*>::iterator TaskGroup::add_mem_tracker_limiter( + MemTrackerLimiter* mem_tracker_ptr, int64_t group_num) { + std::lock_guard<std::mutex> l(_mem_tracker_limiter_pool[group_num].group_lock); + return _mem_tracker_limiter_pool[group_num].trackers.insert( + _mem_tracker_limiter_pool[group_num].trackers.end(), mem_tracker_ptr); +} + +void TaskGroup::remove_mem_tracker_limiter(int64_t group_num, + const std::list<MemTrackerLimiter*>::iterator& iter) { + std::lock_guard<std::mutex> l(_mem_tracker_limiter_pool[group_num].group_lock); + _mem_tracker_limiter_pool[group_num].trackers.erase(iter); +} + +int64_t TaskGroup::memory_limit_gc() { Review Comment: From the perspective of code style, maybe it's better to move this method to mem_tracker_limiter. RG GC logic can be more complex later, I think we can sperate it to another place, it could be current mem_tracker_limiter, or a ResourceGroupMemTrackerLimiter. TaskGroup just need to hold task's info. We can discuss it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org