wangbo commented on code in PR #19526: URL: https://github.com/apache/doris/pull/19526#discussion_r1192231810
########## be/src/runtime/task_group/task_group.cpp: ########## @@ -63,36 +72,105 @@ std::string TaskGroupEntity::debug_string() const { cpu_share(), _queue.size(), _vruntime_ns); } -TaskGroup::TaskGroup(uint64_t id, std::string name, uint64_t cpu_share, int64_t version) - : _id(id), _name(name), _cpu_share(cpu_share), _task_entity(this), _version(version) {} +TaskGroup::TaskGroup(const TaskGroupInfo& tg_info) + : _id(tg_info.id), + _name(tg_info.name), + _cpu_share(tg_info.cpu_share), + _memory_limit(tg_info.memory_limit), + _version(tg_info.version), + _task_entity(this), + _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM) {} std::string TaskGroup::debug_string() const { - std::shared_lock<std::shared_mutex> rl {mutex}; - return fmt::format("TG[id = {}, name = {}, cpu_share = {}, version = {}]", _id, _name, - cpu_share(), _version); -} - -bool TaskGroup::check_version(int64_t version) const { - std::shared_lock<std::shared_mutex> rl {mutex}; - return version > _version; + std::shared_lock<std::shared_mutex> rl {_mutex}; + return fmt::format("TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, version = {}]", + _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES), + _version); } void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) { - if (tg_info._id != _id) { + if (UNLIKELY(tg_info.id != _id)) { return; } + { + std::shared_lock<std::shared_mutex> rl {_mutex}; + if (LIKELY(tg_info.version <= _version)) { + return; + } + } - std::lock_guard<std::shared_mutex> wl {mutex}; - if (tg_info._version > _version) { - _name = tg_info._name; - _cpu_share = tg_info._cpu_share; - _version = tg_info._version; + std::lock_guard<std::shared_mutex> wl {_mutex}; + if (tg_info.version > _version) { + _name = tg_info.name; + _version = tg_info.version; + _memory_limit = tg_info.memory_limit; + if (_cpu_share != tg_info.cpu_share) { + ExecEnv::GetInstance()->pipeline_task_group_scheduler()->update_tg_cpu_share( + tg_info, shared_from_this()); + } } } +void TaskGroup::update_cpu_share_unlock(const TaskGroupInfo& tg_info) { + _cpu_share = tg_info.cpu_share; +} + +std::list<MemTrackerLimiter*>::iterator TaskGroup::add_mem_tracker_limiter( + MemTrackerLimiter* mem_tracker_ptr, int64_t group_num) { + std::lock_guard<std::mutex> l(_mem_tracker_limiter_pool[group_num].group_lock); + return _mem_tracker_limiter_pool[group_num].trackers.insert( + _mem_tracker_limiter_pool[group_num].trackers.end(), mem_tracker_ptr); +} + +void TaskGroup::remove_mem_tracker_limiter(int64_t group_num, + const std::list<MemTrackerLimiter*>::iterator& iter) { + std::lock_guard<std::mutex> l(_mem_tracker_limiter_pool[group_num].group_lock); + _mem_tracker_limiter_pool[group_num].trackers.erase(iter); +} + +int64_t TaskGroup::memory_limit_gc() { + int64_t used_memory = 0; + for (auto& mem_tracker_group : _mem_tracker_limiter_pool) { + std::lock_guard<std::mutex> l(mem_tracker_group.group_lock); + for (const auto& tracker : mem_tracker_group.trackers) { + used_memory += tracker->consumption(); + } + } + + if (used_memory <= memory_limit()) { Review Comment: If Group A 's mem limit is 50%, Group B's mem limit is 50% Group A actually uses 49%, Group B actually uses 49% Then Group A and Group B's mem can not be released, even if BE occupies too much memory. This is different from current logic; In current logic, big query could be released. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org