This is an automated email from the ASF dual-hosted git repository. luozenglin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6c9c9e9765 [feature-wip](resource-group) Supports memory hard isolation of resource group (#19526) 6c9c9e9765 is described below commit 6c9c9e97658859168ac7f43d54aa293491a8d257 Author: luozenglin <luozeng...@baidu.com> AuthorDate: Mon May 15 22:45:46 2023 +0800 [feature-wip](resource-group) Supports memory hard isolation of resource group (#19526) --- be/src/common/daemon.cpp | 19 ++- be/src/pipeline/task_queue.cpp | 6 +- be/src/pipeline/task_queue.h | 14 +- be/src/pipeline/task_scheduler.cpp | 9 +- be/src/pipeline/task_scheduler.h | 4 +- be/src/runtime/fragment_mgr.cpp | 37 ++--- be/src/runtime/memory/mem_tracker.h | 6 +- be/src/runtime/memory/mem_tracker_limiter.cpp | 180 +++++++++++++-------- be/src/runtime/memory/mem_tracker_limiter.h | 66 +++++++- be/src/runtime/query_context.h | 3 + be/src/runtime/task_group/task_group.cpp | 107 +++++++++--- be/src/runtime/task_group/task_group.h | 39 +++-- be/src/runtime/task_group/task_group_manager.cpp | 35 +++- be/src/runtime/task_group/task_group_manager.h | 2 + .../java/org/apache/doris/qe/StmtExecutor.java | 9 +- .../resource/resourcegroup/ResourceGroup.java | 43 +++-- .../resource/resourcegroup/ResourceGroupMgr.java | 23 ++- .../resourcegroup/ResourceGroupMgrTest.java | 6 + .../resource/resourcegroup/ResourceGroupTest.java | 21 ++- 19 files changed, 440 insertions(+), 189 deletions(-) diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 5c832cfd77..592442b920 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -47,6 +47,7 @@ #include "runtime/load_channel_mgr.h" #include "runtime/memory/mem_tracker.h" #include "runtime/memory/mem_tracker_limiter.h" +#include "runtime/task_group/task_group_manager.h" #include "runtime/user_function_cache.h" #include "service/backend_options.h" #include "util/cpu_info.h" @@ -230,10 +231,16 @@ void Daemon::memory_gc_thread() { if (!MemInfo::initialized() || !ExecEnv::GetInstance()->initialized()) { continue; } + auto sys_mem_available = doris::MemInfo::sys_mem_available(); + auto proc_mem_no_allocator_cache = doris::MemInfo::proc_mem_no_allocator_cache(); + + auto tg_free_mem = taskgroup::TaskGroupManager::instance()->memory_limit_gc(); + sys_mem_available += tg_free_mem; + proc_mem_no_allocator_cache -= tg_free_mem; + if (memory_full_gc_sleep_time_ms <= 0 && - (doris::MemInfo::sys_mem_available() < - doris::MemInfo::sys_mem_available_low_water_mark() || - doris::MemInfo::proc_mem_no_allocator_cache() >= doris::MemInfo::mem_limit())) { + (sys_mem_available < doris::MemInfo::sys_mem_available_low_water_mark() || + proc_mem_no_allocator_cache >= doris::MemInfo::mem_limit())) { // No longer full gc and minor gc during sleep. memory_full_gc_sleep_time_ms = config::memory_gc_sleep_time_s * 1000; memory_minor_gc_sleep_time_ms = config::memory_gc_sleep_time_s * 1000; @@ -243,10 +250,8 @@ void Daemon::memory_gc_thread() { doris::MemTrackerLimiter::enable_print_log_process_usage(); } } else if (memory_minor_gc_sleep_time_ms <= 0 && - (doris::MemInfo::sys_mem_available() < - doris::MemInfo::sys_mem_available_warning_water_mark() || - doris::MemInfo::proc_mem_no_allocator_cache() >= - doris::MemInfo::soft_mem_limit())) { + (sys_mem_available < doris::MemInfo::sys_mem_available_warning_water_mark() || + proc_mem_no_allocator_cache >= doris::MemInfo::soft_mem_limit())) { // No minor gc during sleep, but full gc is possible. memory_minor_gc_sleep_time_ms = config::memory_gc_sleep_time_s * 1000; doris::MemTrackerLimiter::print_log_process_usage("process minor gc", false); diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index 8078d7b414..807a344cf9 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -350,8 +350,8 @@ void TaskGroupTaskQueue::update_statistics(PipelineTask* task, int64_t time_spen } } -void TaskGroupTaskQueue::update_task_group(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TaskGroupPtr& task_group) { +void TaskGroupTaskQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, + taskgroup::TaskGroupPtr task_group) { std::unique_lock<std::mutex> lock(_rs_mutex); auto* entity = task_group->task_entity(); bool is_in_queue = _group_entities.find(entity) != _group_entities.end(); @@ -359,7 +359,7 @@ void TaskGroupTaskQueue::update_task_group(const taskgroup::TaskGroupInfo& task_ _group_entities.erase(entity); _total_cpu_share -= entity->cpu_share(); } - task_group->check_and_update(task_group_info); + task_group->update_cpu_share_unlock(task_group_info); if (is_in_queue) { _group_entities.emplace(entity); _total_cpu_share += entity->cpu_share(); diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 6b16b70f99..9956ba3cb9 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -53,8 +53,8 @@ public: virtual void update_statistics(PipelineTask* task, int64_t time_spent) {} - virtual void update_task_group(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TaskGroupPtr& task_group) = 0; + virtual void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, + taskgroup::TaskGroupPtr task_group) = 0; int cores() const { return _core_size; } @@ -154,9 +154,9 @@ public: time_spent); } - void update_task_group(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TaskGroupPtr& task_group) override { - LOG(FATAL) << "update_task_group not implemented"; + void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, + taskgroup::TaskGroupPtr task_group) override { + LOG(FATAL) << "update_tg_cpu_share not implemented"; } private: @@ -184,8 +184,8 @@ public: void update_statistics(PipelineTask* task, int64_t time_spent) override; - void update_task_group(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TaskGroupPtr& task_group) override; + void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, + taskgroup::TaskGroupPtr task_group) override; private: template <bool from_executor> diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index cd219a25e0..53e9a4d868 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -354,12 +354,9 @@ void TaskScheduler::shutdown() { } } -void TaskScheduler::try_update_task_group(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TaskGroupPtr& task_group) { - if (!task_group->check_version(task_group_info._version)) { - return; - } - _task_queue->update_task_group(task_group_info, task_group); +void TaskScheduler::update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, + taskgroup::TaskGroupPtr task_group) { + _task_queue->update_tg_cpu_share(task_group_info, task_group); } } // namespace doris::pipeline diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index b2a665f034..1fcbe2c068 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -91,8 +91,8 @@ public: void shutdown(); - void try_update_task_group(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TaskGroupPtr& task_group); + void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, + taskgroup::TaskGroupPtr task_group); private: std::unique_ptr<ThreadPool> _fix_thread_pool; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 0cca7fa10f..c8e6a8b25e 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -56,7 +56,6 @@ #include "io/fs/stream_load_pipe.h" #include "opentelemetry/trace/scope.h" #include "pipeline/pipeline_fragment_context.h" -#include "pipeline/task_scheduler.h" #include "runtime/client_cache.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" @@ -691,6 +690,25 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo query_ctx->query_mem_tracker->enable_print_log_usage(); } + if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) { + if (params.__isset.resource_groups && !params.resource_groups.empty()) { + taskgroup::TaskGroupInfo task_group_info; + auto status = taskgroup::TaskGroupInfo::parse_group_info(params.resource_groups[0], + &task_group_info); + if (status.ok()) { + auto tg = taskgroup::TaskGroupManager::instance()->get_or_create_task_group( + task_group_info); + tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker); + query_ctx->set_task_group(tg); + LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id) + << " use task group: " << tg->debug_string(); + } + } else { + VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id) + << " does not use task group."; + } + } + { // Find _query_ctx_map again, in case some other request has already // create the query fragments context. @@ -803,23 +821,6 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::shared_ptr<QueryContext> query_ctx; RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx)); - if (params.__isset.resource_groups && !params.resource_groups.empty()) { - taskgroup::TaskGroupInfo task_group_info; - auto status = taskgroup::TaskGroupInfo::parse_group_info(params.resource_groups[0], - &task_group_info); - if (status.ok()) { - auto tg = taskgroup::TaskGroupManager::instance()->get_or_create_task_group( - task_group_info); - _exec_env->pipeline_task_group_scheduler()->try_update_task_group(task_group_info, tg); - query_ctx->set_task_group(tg); - LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id) - << " use task group: " << tg->debug_string(); - } - } else { - VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id) - << " does not use task group."; - } - for (size_t i = 0; i < params.local_params.size(); i++) { const auto& local_params = params.local_params[i]; diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index 81f901879c..c21ef478e0 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -115,7 +115,7 @@ public: // For MemTrackerLimiter MemTracker() { _parent_group_num = -1; } - ~MemTracker(); + virtual ~MemTracker(); static std::string print_bytes(int64_t bytes) { return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES) @@ -154,13 +154,13 @@ public: static void refresh_all_tracker_profile(); public: - Snapshot make_snapshot() const; + virtual Snapshot make_snapshot() const; // Specify group_num from mem_tracker_pool to generate snapshot. static void make_group_snapshot(std::vector<Snapshot>* snapshots, int64_t group_num, std::string parent_label); static std::string log_usage(MemTracker::Snapshot snapshot); - std::string debug_string() { + virtual std::string debug_string() { std::stringstream msg; msg << "label: " << _label << "; " << "consumption: " << consumption() << "; " diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index d9974bef7f..076af90fba 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -29,6 +29,7 @@ #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "runtime/load_channel_mgr.h" +#include "runtime/task_group/task_group.h" #include "service/backend_options.h" #include "util/mem_info.h" #include "util/perf_counters.h" @@ -37,15 +38,10 @@ namespace doris { -struct TrackerLimiterGroup { - std::list<MemTrackerLimiter*> trackers; - std::mutex group_lock; -}; - // Save all MemTrackerLimiters in use. // Each group corresponds to several MemTrackerLimiters and has a lock. // Multiple groups are used to reduce the impact of locks. -static std::vector<TrackerLimiterGroup> mem_tracker_limiter_pool(1000); +static std::vector<TrackerLimiterGroup> mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM); std::atomic<bool> MemTrackerLimiter::_enable_print_log_process_usage {true}; bool MemTrackerLimiter::_oom_avoidance {true}; @@ -94,7 +90,7 @@ MemTrackerLimiter::~MemTrackerLimiter() { MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const { Snapshot snapshot; - snapshot.type = TypeString[_type]; + snapshot.type = type_string(_type); snapshot.label = _label; snapshot.limit = _limit; snapshot.cur_consumption = _consumption->current_value(); @@ -131,7 +127,7 @@ void MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot> int64_t process_mem_sum = 0; Snapshot snapshot; for (auto it : MemTrackerLimiter::TypeMemSum) { - snapshot.type = TypeString[it.first]; + snapshot.type = type_string(it.first); snapshot.label = ""; snapshot.limit = -1; snapshot.cur_consumption = it.second->current_value(); @@ -315,14 +311,35 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str(int64_t bytes) { int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, const std::string& vm_rss_str, const std::string& mem_available_str, Type type) { - std::priority_queue<std::pair<int64_t, std::string>, - std::vector<std::pair<int64_t, std::string>>, - std::greater<std::pair<int64_t, std::string>>> - min_pq; + return free_top_memory_query( + min_free_mem, type, mem_tracker_limiter_pool, + [&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption, + const std::string& label) { + return fmt::format( + "Process has no memory available, cancel top memory usage {}: " + "{} memory tracker <{}> consumption {}, backend {} " + "process memory used {} exceed limit {} or sys mem available {} " + "less than low water mark {}. Execute again after enough memory, " + "details see be.INFO.", + type_string(type), type_string(type), label, print_bytes(mem_consumption), + BackendOptions::get_localhost(), vm_rss_str, MemInfo::mem_limit_str(), + mem_available_str, + print_bytes(MemInfo::sys_mem_available_low_water_mark())); + }); +} + +template <typename TrackerGroups> +int64_t MemTrackerLimiter::free_top_memory_query( + int64_t min_free_mem, Type type, std::vector<TrackerGroups>& tracker_groups, + const std::function<std::string(int64_t, const std::string&)>& cancel_msg) { + using MemTrackerMinQueue = std::priority_queue<std::pair<int64_t, std::string>, + std::vector<std::pair<int64_t, std::string>>, + std::greater<std::pair<int64_t, std::string>>>; + MemTrackerMinQueue min_pq; // After greater than min_free_mem, will not be modified. int64_t prepare_free_mem = 0; - auto cancel_top_query = [&](auto min_pq) -> int64_t { + auto cancel_top_query = [&cancel_msg, type](auto& min_pq) -> int64_t { std::vector<std::string> usage_strings; int64_t freed_mem = 0; while (!min_pq.empty()) { @@ -333,15 +350,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, } ExecEnv::GetInstance()->fragment_mgr()->cancel_query( cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, - fmt::format("Process has no memory available, cancel top memory usage {}: " - "{} memory tracker <{}> consumption {}, backend {} " - "process memory used {} exceed limit {} or sys mem available {} " - "less than low water mark {}. Execute again after enough memory, " - "details see be.INFO.", - TypeString[type], TypeString[type], min_pq.top().second, - print_bytes(min_pq.top().first), BackendOptions::get_localhost(), - vm_rss_str, MemInfo::mem_limit_str(), mem_available_str, - print_bytes(MemInfo::sys_mem_available_low_water_mark()))); + cancel_msg(min_pq.top().first, min_pq.top().second)); freed_mem += min_pq.top().first; usage_strings.push_back(fmt::format("{} memory usage {} Bytes", min_pq.top().second, @@ -349,38 +358,34 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, min_pq.pop(); } if (!usage_strings.empty()) { - LOG(INFO) << "Process GC Free Top Memory Usage " << TypeString[type] << ": " + LOG(INFO) << "Process GC Free Top Memory Usage " << type_string(type) << ": " << join(usage_strings, ","); } return freed_mem; }; - for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) { - std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock); - for (auto tracker : mem_tracker_limiter_pool[i].trackers) { + for (unsigned i = 1; i < tracker_groups.size(); ++i) { + std::lock_guard<std::mutex> l(tracker_groups[i].group_lock); + for (auto tracker : tracker_groups[i].trackers) { if (tracker->type() == type) { if (ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled( label_to_queryid(tracker->label()))) { continue; } if (tracker->consumption() > min_free_mem) { - std::priority_queue<std::pair<int64_t, std::string>, - std::vector<std::pair<int64_t, std::string>>, - std::greater<std::pair<int64_t, std::string>>> - min_pq_null; - std::swap(min_pq, min_pq_null); - min_pq.push(std::pair<int64_t, std::string>(tracker->consumption(), - tracker->label())); - return cancel_top_query(min_pq); + MemTrackerMinQueue min_pq_single; + min_pq_single.emplace(tracker->consumption(), tracker->label()); + return cancel_top_query(min_pq_single); } else if (tracker->consumption() + prepare_free_mem < min_free_mem) { - min_pq.push(std::pair<int64_t, std::string>(tracker->consumption(), - tracker->label())); + min_pq.emplace(tracker->consumption(), tracker->label()); prepare_free_mem += tracker->consumption(); } else if (tracker->consumption() > min_pq.top().first) { - // No need to modify prepare_free_mem, prepare_free_mem will always be greater than min_free_mem. - min_pq.push(std::pair<int64_t, std::string>(tracker->consumption(), - tracker->label())); - min_pq.pop(); + min_pq.emplace(tracker->consumption(), tracker->label()); + prepare_free_mem += tracker->consumption(); + while (prepare_free_mem - min_pq.top().first > min_free_mem) { + prepare_free_mem -= min_pq.top().first; + min_pq.pop(); + } } } } @@ -392,15 +397,33 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, const std::string& vm_rss_str, const std::string& mem_available_str, Type type) { - std::priority_queue<std::pair<int64_t, std::string>, - std::vector<std::pair<int64_t, std::string>>, - std::greater<std::pair<int64_t, std::string>>> - min_pq; + return free_top_overcommit_query( + min_free_mem, type, mem_tracker_limiter_pool, + [&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption, + const std::string& label) { + return fmt::format( + "Process has less memory, cancel top memory overcommit {}: " + "{} memory tracker <{}> consumption {}, backend {} " + "process memory used {} exceed soft limit {} or sys mem available {} " + "less than warning water mark {}. Execute again after enough memory, " + "details see be.INFO.", + type_string(type), type_string(type), label, print_bytes(mem_consumption), + BackendOptions::get_localhost(), vm_rss_str, MemInfo::soft_mem_limit_str(), + mem_available_str, + print_bytes(MemInfo::sys_mem_available_warning_water_mark())); + }); +} + +template <typename TrackerGroups> +int64_t MemTrackerLimiter::free_top_overcommit_query( + int64_t min_free_mem, Type type, std::vector<TrackerGroups>& tracker_groups, + const std::function<std::string(int64_t, const std::string&)>& cancel_msg) { + std::priority_queue<std::pair<int64_t, std::string>> max_pq; std::unordered_map<std::string, int64_t> query_consumption; - for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) { - std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock); - for (auto tracker : mem_tracker_limiter_pool[i].trackers) { + for (unsigned i = 1; i < tracker_groups.size(); ++i) { + std::lock_guard<std::mutex> l(tracker_groups[i].group_lock); + for (auto tracker : tracker_groups[i].trackers) { if (tracker->type() == type) { if (tracker->consumption() <= 33554432) { // 32M small query does not cancel continue; @@ -411,7 +434,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, } int64_t overcommit_ratio = (static_cast<double>(tracker->consumption()) / tracker->limit()) * 10000; - min_pq.push(std::pair<int64_t, std::string>(overcommit_ratio, tracker->label())); + max_pq.emplace(overcommit_ratio, tracker->label()); query_consumption[tracker->label()] = tracker->consumption(); } } @@ -422,13 +445,6 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, return 0; } - std::priority_queue<std::pair<int64_t, std::string>> max_pq; - // Min-heap to Max-heap. - while (!min_pq.empty()) { - max_pq.push(min_pq.top()); - min_pq.pop(); - } - std::vector<std::string> usage_strings; int64_t freed_mem = 0; while (!max_pq.empty()) { @@ -440,15 +456,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, int64_t query_mem = query_consumption[max_pq.top().second]; ExecEnv::GetInstance()->fragment_mgr()->cancel_query( cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, - fmt::format("Process has less memory, cancel top memory overcommit {}: " - "{} memory tracker <{}> consumption {}, backend {} " - "process memory used {} exceed soft limit {} or sys mem available {} " - "less than warning water mark {}. Execute again after enough memory, " - "details see be.INFO.", - TypeString[type], TypeString[type], max_pq.top().second, - print_bytes(query_mem), BackendOptions::get_localhost(), vm_rss_str, - MemInfo::soft_mem_limit_str(), mem_available_str, - print_bytes(MemInfo::sys_mem_available_warning_water_mark()))); + cancel_msg(query_mem, max_pq.top().second)); usage_strings.push_back(fmt::format("{} memory usage {} Bytes, overcommit ratio: {}", max_pq.top().second, query_mem, max_pq.top().first)); @@ -459,10 +467,52 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, max_pq.pop(); } if (!usage_strings.empty()) { - LOG(INFO) << "Process GC Free Top Memory Overcommit " << TypeString[type] << ": " + LOG(INFO) << "Process GC Free Top Memory Overcommit " << type_string(type) << ": " << join(usage_strings, ","); } return freed_mem; } +int64_t MemTrackerLimiter::tg_memory_limit_gc( + uint64_t id, const std::string& name, int64_t memory_limit, + std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_limiter_groups) { + int64_t used_memory = 0; + for (auto& mem_tracker_group : tracker_limiter_groups) { + std::lock_guard<std::mutex> l(mem_tracker_group.group_lock); + for (const auto& tracker : mem_tracker_group.trackers) { + used_memory += tracker->consumption(); + } + } + + if (used_memory <= memory_limit) { + return 0; + } + + int64_t need_free_mem = used_memory - memory_limit; + int64_t freed_mem = 0; + constexpr auto query_type = MemTrackerLimiter::Type::QUERY; + auto cancel_str = [id, &name, memory_limit, used_memory](int64_t mem_consumption, + const std::string& label) { + return fmt::format( + "Resource group id:{}, name:{} memory exceeded limit, cancel top memory {}: " + "memory tracker <{}> consumption {}, backend {}, " + "resource group memory used {}, memory limit {}.", + id, name, MemTrackerLimiter::type_string(query_type), label, + MemTracker::print_bytes(mem_consumption), BackendOptions::get_localhost(), + MemTracker::print_bytes(used_memory), MemTracker::print_bytes(memory_limit)); + }; + if (config::enable_query_memroy_overcommit) { + freed_mem += MemTrackerLimiter::free_top_overcommit_query( + need_free_mem - freed_mem, query_type, tracker_limiter_groups, cancel_str); + } + if (freed_mem < need_free_mem) { + freed_mem += MemTrackerLimiter::free_top_memory_query(need_free_mem - freed_mem, query_type, + tracker_limiter_groups, cancel_str); + } + LOG(INFO) << fmt::format( + "task group {} finished gc, memory_limit: {}, used_memory: {}, freed_mem: {}.", name, + memory_limit, used_memory, freed_mem); + return freed_mem; +} + } // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 7665a029f8..3cc7108b27 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -28,6 +28,7 @@ #include <list> #include <memory> #include <ostream> +#include <queue> #include <string> #include <unordered_map> #include <vector> @@ -41,6 +42,21 @@ namespace doris { +constexpr auto MEM_TRACKER_GROUP_NUM = 1000; + +namespace taskgroup { +struct TgTrackerLimiterGroup; +class TaskGroup; +using TaskGroupPtr = std::shared_ptr<TaskGroup>; +} // namespace taskgroup + +class MemTrackerLimiter; + +struct TrackerLimiterGroup { + std::list<MemTrackerLimiter*> trackers; + std::mutex group_lock; +}; + // Track and limit the memory usage of process and query. // Contains an limit, arranged into a tree structure. // @@ -49,7 +65,7 @@ namespace doris { // will be recorded on this Query, otherwise it will be recorded in Orphan Tracker by default. class MemTrackerLimiter final : public MemTracker { public: - enum Type { + enum class Type { GLOBAL = 0, // Life cycle is the same as the process, e.g. Cache and default Orphan QUERY = 1, // Count the memory consumption of all Query tasks. LOAD = 2, // Count the memory consumption of all Load tasks. @@ -76,9 +92,6 @@ public: {Type::EXPERIMENTAL, std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)}}; - inline static const std::string TypeString[] = { - "global", "query", "load", "compaction", "schema_change", "clone", "experimental"}; - public: // byte_limit equal to -1 means no consumption limit, only participate in process memory statistics. MemTrackerLimiter(Type type, const std::string& label = std::string(), int64_t byte_limit = -1, @@ -87,6 +100,28 @@ public: ~MemTrackerLimiter(); + static std::string type_string(Type type) { + switch (type) { + case Type::GLOBAL: + return "global"; + case Type::QUERY: + return "query"; + case Type::LOAD: + return "load"; + case Type::COMPACTION: + return "compaction"; + case Type::SCHEMA_CHANGE: + return "schema_change"; + case Type::CLONE: + return "clone"; + case Type::EXPERIMENTAL: + return "experimental"; + default: + LOG(FATAL) << "not match type of mem tracker limiter :" << static_cast<int>(type); + } + __builtin_unreachable(); + } + static bool sys_mem_exceed_limit_check(int64_t bytes); void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption not supported"; } @@ -118,7 +153,7 @@ public: static void refresh_global_counter(); static void refresh_all_tracker_profile(); - Snapshot make_snapshot() const; + Snapshot make_snapshot() const override; // Returns a list of all the valid tracker snapshots. static void make_process_snapshots(std::vector<MemTracker::Snapshot>* snapshots); static void make_type_snapshots(std::vector<MemTracker::Snapshot>* snapshots, Type type); @@ -137,6 +172,12 @@ public: static int64_t free_top_memory_query(int64_t min_free_mem, const std::string& vm_rss_str, const std::string& mem_available_str, Type type = Type::QUERY); + + template <typename TrackerGroups> + static int64_t free_top_memory_query( + int64_t min_free_mem, Type type, std::vector<TrackerGroups>& tracker_groups, + const std::function<std::string(int64_t, const std::string&)>& cancel_msg); + static int64_t free_top_memory_load(int64_t min_free_mem, const std::string& vm_rss_str, const std::string& mem_available_str) { return free_top_memory_query(min_free_mem, vm_rss_str, mem_available_str, Type::LOAD); @@ -146,10 +187,21 @@ public: static int64_t free_top_overcommit_query(int64_t min_free_mem, const std::string& vm_rss_str, const std::string& mem_available_str, Type type = Type::QUERY); + + template <typename TrackerGroups> + static int64_t free_top_overcommit_query( + int64_t min_free_mem, Type type, std::vector<TrackerGroups>& tracker_groups, + const std::function<std::string(int64_t, const std::string&)>& cancel_msg); + static int64_t free_top_overcommit_load(int64_t min_free_mem, const std::string& vm_rss_str, const std::string& mem_available_str) { return free_top_overcommit_query(min_free_mem, vm_rss_str, mem_available_str, Type::LOAD); } + + static int64_t tg_memory_limit_gc( + uint64_t id, const std::string& name, int64_t memory_limit, + std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_limiter_groups); + // only for Type::QUERY or Type::LOAD. static TUniqueId label_to_queryid(const std::string& label) { if (label.rfind("Query#Id=", 0) != 0 && label.rfind("Load#Id=", 0) != 0) { @@ -170,12 +222,12 @@ public: std::string tracker_limit_exceeded_str(); std::string tracker_limit_exceeded_str(int64_t bytes); - std::string debug_string() { + std::string debug_string() override { std::stringstream msg; msg << "limit: " << _limit << "; " << "consumption: " << _consumption->current_value() << "; " << "label: " << _label << "; " - << "type: " << TypeString[_type] << "; "; + << "type: " << type_string(_type) << "; "; return msg.str(); } diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 0909702c70..7158f8b054 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -73,6 +73,9 @@ public: MemTracker::print_bytes(query_mem_tracker->consumption()), MemTracker::print_bytes(query_mem_tracker->peak_consumption())); } + if (_task_group) { + _task_group->remove_mem_tracker_limiter(query_mem_tracker); + } } // Notice. For load fragments, the fragment_num sent by FE has a small probability of 0. diff --git a/be/src/runtime/task_group/task_group.cpp b/be/src/runtime/task_group/task_group.cpp index ebe775a485..16a6356f30 100644 --- a/be/src/runtime/task_group/task_group.cpp +++ b/be/src/runtime/task_group/task_group.cpp @@ -27,10 +27,19 @@ #include <utility> #include "common/logging.h" +#include "pipeline/task_scheduler.h" +#include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker_limiter.h" +#include "service/backend_options.h" +#include "util/mem_info.h" +#include "util/parse_util.h" namespace doris { namespace taskgroup { +const static std::string CPU_SHARE = "cpu_share"; +const static std::string MEMORY_LIMIT = "memory_limit"; + pipeline::PipelineTask* TaskGroupEntity::take() { if (_queue.empty()) { return nullptr; @@ -67,36 +76,76 @@ std::string TaskGroupEntity::debug_string() const { cpu_share(), _queue.size(), _vruntime_ns); } -TaskGroup::TaskGroup(uint64_t id, std::string name, uint64_t cpu_share, int64_t version) - : _id(id), _name(name), _cpu_share(cpu_share), _task_entity(this), _version(version) {} +TaskGroup::TaskGroup(const TaskGroupInfo& tg_info) + : _id(tg_info.id), + _name(tg_info.name), + _cpu_share(tg_info.cpu_share), + _memory_limit(tg_info.memory_limit), + _version(tg_info.version), + _task_entity(this), + _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM) {} std::string TaskGroup::debug_string() const { - std::shared_lock<std::shared_mutex> rl {mutex}; - return fmt::format("TG[id = {}, name = {}, cpu_share = {}, version = {}]", _id, _name, - cpu_share(), _version); -} - -bool TaskGroup::check_version(int64_t version) const { - std::shared_lock<std::shared_mutex> rl {mutex}; - return version > _version; + std::shared_lock<std::shared_mutex> rl {_mutex}; + return fmt::format("TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, version = {}]", + _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES), + _version); } void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) { - if (tg_info._id != _id) { + if (UNLIKELY(tg_info.id != _id)) { return; } + { + std::shared_lock<std::shared_mutex> rl {_mutex}; + if (LIKELY(tg_info.version <= _version)) { + return; + } + } + + std::lock_guard<std::shared_mutex> wl {_mutex}; + if (tg_info.version > _version) { + _name = tg_info.name; + _version = tg_info.version; + _memory_limit = tg_info.memory_limit; + if (_cpu_share != tg_info.cpu_share) { + ExecEnv::GetInstance()->pipeline_task_group_scheduler()->update_tg_cpu_share( + tg_info, shared_from_this()); + } + } +} + +void TaskGroup::update_cpu_share_unlock(const TaskGroupInfo& tg_info) { + _cpu_share = tg_info.cpu_share; +} - std::lock_guard<std::shared_mutex> wl {mutex}; - if (tg_info._version > _version) { - _name = tg_info._name; - _cpu_share = tg_info._cpu_share; - _version = tg_info._version; +void TaskGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) { + auto group_num = mem_tracker_ptr->group_num(); + std::lock_guard<std::mutex> l(_mem_tracker_limiter_pool[group_num].group_lock); + _mem_tracker_limiter_pool[group_num].trackers.insert(mem_tracker_ptr); +} + +void TaskGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) { + auto group_num = mem_tracker_ptr->group_num(); + std::lock_guard<std::mutex> l(_mem_tracker_limiter_pool[group_num].group_lock); + _mem_tracker_limiter_pool[group_num].trackers.erase(mem_tracker_ptr); +} + +int64_t TaskGroup::memory_limit_gc() { + std::string name; + int64_t memory_limit; + { + std::shared_lock<std::shared_mutex> rl {_mutex}; + name = _name; + memory_limit = _memory_limit; } + return MemTrackerLimiter::tg_memory_limit_gc(_id, name, memory_limit, + _mem_tracker_limiter_pool); } Status TaskGroupInfo::parse_group_info(const TPipelineResourceGroup& resource_group, TaskGroupInfo* task_group_info) { - if (!check_group_info(resource_group)) { + if (UNLIKELY(!check_group_info(resource_group))) { std::stringstream ss; ss << "incomplete resource group parameters: "; resource_group.printTo(ss); @@ -108,17 +157,31 @@ Status TaskGroupInfo::parse_group_info(const TPipelineResourceGroup& resource_gr uint64_t share = 0; std::from_chars(iter->second.c_str(), iter->second.c_str() + iter->second.size(), share); - task_group_info->_id = resource_group.id; - task_group_info->_name = resource_group.name; - task_group_info->_version = resource_group.version; - task_group_info->_cpu_share = share; + task_group_info->id = resource_group.id; + task_group_info->name = resource_group.name; + task_group_info->version = resource_group.version; + task_group_info->cpu_share = share; + + bool is_percent = true; + auto mem_limit_str = resource_group.properties.find(MEMORY_LIMIT)->second; + auto mem_limit = + ParseUtil::parse_mem_spec(mem_limit_str, -1, MemInfo::mem_limit(), &is_percent); + if (UNLIKELY(mem_limit <= 0)) { + std::stringstream ss; + ss << "parse memory limit from TPipelineResourceGroup error, " << MEMORY_LIMIT << ": " + << mem_limit_str; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + task_group_info->memory_limit = mem_limit; return Status::OK(); } bool TaskGroupInfo::check_group_info(const TPipelineResourceGroup& resource_group) { return resource_group.__isset.id && resource_group.__isset.version && resource_group.__isset.name && resource_group.__isset.properties && - resource_group.properties.count(CPU_SHARE) > 0; + resource_group.properties.count(CPU_SHARE) > 0 && + resource_group.properties.count(MEMORY_LIMIT) > 0; } } // namespace taskgroup diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/task_group/task_group.h index e66ef2e1b0..a7854158d2 100644 --- a/be/src/runtime/task_group/task_group.h +++ b/be/src/runtime/task_group/task_group.h @@ -25,6 +25,7 @@ #include <queue> #include <shared_mutex> #include <string> +#include <unordered_set> #include "common/status.h" @@ -35,14 +36,13 @@ class PipelineTask; } class TPipelineResourceGroup; +class MemTrackerLimiter; namespace taskgroup { class TaskGroup; struct TaskGroupInfo; -const static std::string CPU_SHARE = "cpu_share"; - class TaskGroupEntity { public: explicit TaskGroupEntity(taskgroup::TaskGroup* ts) : _tg(ts) {} @@ -72,9 +72,14 @@ private: using TGEntityPtr = TaskGroupEntity*; -class TaskGroup { +struct TgTrackerLimiterGroup { + std::unordered_set<std::shared_ptr<MemTrackerLimiter>> trackers; + std::mutex group_lock; +}; + +class TaskGroup : public std::enable_shared_from_this<TaskGroup> { public: - TaskGroup(uint64_t id, std::string name, uint64_t cpu_share, int64_t version); + explicit TaskGroup(const TaskGroupInfo& tg_info); TaskGroupEntity* task_entity() { return &_task_entity; } @@ -84,26 +89,36 @@ public: std::string debug_string() const; - bool check_version(int64_t version) const; - void check_and_update(const TaskGroupInfo& tg_info); + void update_cpu_share_unlock(const TaskGroupInfo& tg_info); + + void add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr); + + void remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr); + + int64_t memory_limit_gc(); + private: - mutable std::shared_mutex mutex; + mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit const uint64_t _id; std::string _name; std::atomic<uint64_t> _cpu_share; - TaskGroupEntity _task_entity; + int64_t _memory_limit; // bytes int64_t _version; + TaskGroupEntity _task_entity; + + std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool; }; using TaskGroupPtr = std::shared_ptr<TaskGroup>; struct TaskGroupInfo { - uint64_t _id; - std::string _name; - uint64_t _cpu_share; - int64_t _version; + uint64_t id; + std::string name; + uint64_t cpu_share; + int64_t version; + int64_t memory_limit; static Status parse_group_info(const TPipelineResourceGroup& resource_group, TaskGroupInfo* task_group_info); diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index d393264983..c741a0bffd 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -20,6 +20,7 @@ #include <memory> #include <mutex> +#include "runtime/memory/mem_tracker_limiter.h" #include "runtime/task_group/task_group.h" namespace doris::taskgroup { @@ -35,20 +36,38 @@ TaskGroupManager* TaskGroupManager::instance() { TaskGroupPtr TaskGroupManager::get_or_create_task_group(const TaskGroupInfo& task_group_info) { { std::shared_lock<std::shared_mutex> r_lock(_group_mutex); - if (_task_groups.count(task_group_info._id)) { - return _task_groups[task_group_info._id]; + if (LIKELY(_task_groups.count(task_group_info.id))) { + auto task_group = _task_groups[task_group_info.id]; + task_group->check_and_update(task_group_info); + return task_group; } } - auto new_task_group = - std::make_shared<TaskGroup>(task_group_info._id, task_group_info._name, - task_group_info._cpu_share, task_group_info._version); + auto new_task_group = std::make_shared<TaskGroup>(task_group_info); std::lock_guard<std::shared_mutex> w_lock(_group_mutex); - if (_task_groups.count(task_group_info._id)) { - return _task_groups[task_group_info._id]; + if (_task_groups.count(task_group_info.id)) { + auto task_group = _task_groups[task_group_info.id]; + task_group->check_and_update(task_group_info); + return task_group; } - _task_groups[task_group_info._id] = new_task_group; + _task_groups[task_group_info.id] = new_task_group; return new_task_group; } +int64_t TaskGroupManager::memory_limit_gc() { + int64_t total_free_memory = 0; + std::vector<TaskGroupPtr> task_groups; + { + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + task_groups.reserve(_task_groups.size()); + for (const auto& [id, task_group] : _task_groups) { + task_groups.push_back(task_group); + } + } + for (const auto& task_group : task_groups) { + total_free_memory += task_group->memory_limit_gc(); + } + return total_free_memory; +} + } // namespace doris::taskgroup diff --git a/be/src/runtime/task_group/task_group_manager.h b/be/src/runtime/task_group/task_group_manager.h index 0f415498a0..c053a3a45d 100644 --- a/be/src/runtime/task_group/task_group_manager.h +++ b/be/src/runtime/task_group/task_group_manager.h @@ -33,6 +33,8 @@ public: TaskGroupPtr get_or_create_task_group(const TaskGroupInfo& task_group_info); + int64_t memory_limit_gc(); + private: std::shared_mutex _group_mutex; std::unordered_map<uint64_t, TaskGroupPtr> _task_groups; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 3a02eb67c0..4531535983 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -871,10 +871,6 @@ public class StmtExecutor { || (parsedStmt instanceof InsertStmt && !((InsertStmt) parsedStmt).needLoadManager()) || parsedStmt instanceof CreateTableAsSelectStmt || parsedStmt instanceof InsertOverwriteTableStmt) { - if (Config.enable_resource_group && context.sessionVariable.enablePipelineEngine()) { - analyzer.setResourceGroups(analyzer.getEnv().getResourceGroupMgr() - .getResourceGroup(context.sessionVariable.resourceGroup)); - } Map<Long, TableIf> tableMap = Maps.newTreeMap(); QueryStmt queryStmt; Set<String> parentViewNameSet = Sets.newHashSet(); @@ -1060,6 +1056,11 @@ public class StmtExecutor { parsedStmt.setIsExplain(explainOptions); } } + if (parsedStmt instanceof QueryStmt && Config.enable_resource_group + && context.sessionVariable.enablePipelineEngine()) { + analyzer.setResourceGroups(analyzer.getEnv().getResourceGroupMgr() + .getResourceGroup(context.sessionVariable.resourceGroup)); + } } profile.getSummaryProfile().setQueryAnalysisFinishTime(); planner = new OriginalPlanner(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java index 538c064247..b859b95752 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java @@ -30,6 +30,8 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; @@ -38,14 +40,17 @@ import java.util.HashMap; import java.util.Map; public class ResourceGroup implements Writable { + private static final Logger LOG = LogManager.getLogger(ResourceGroup.class); public static final String CPU_SHARE = "cpu_share"; + public static final String MEMORY_LIMIT = "memory_limit"; + private static final ImmutableSet<String> REQUIRED_PROPERTIES_NAME = new ImmutableSet.Builder<String>().add( - CPU_SHARE).build(); + CPU_SHARE).add(MEMORY_LIMIT).build(); private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new ImmutableSet.Builder<String>().add( - CPU_SHARE).build(); + CPU_SHARE).add(MEMORY_LIMIT).build(); @SerializedName(value = "id") private long id; @@ -60,11 +65,10 @@ public class ResourceGroup implements Writable { @SerializedName(value = "version") private long version; + private double memoryLimitPercent; + private ResourceGroup(long id, String name, Map<String, String> properties) { - this.id = id; - this.name = name; - this.properties = properties; - this.version = 0; + this(id, name, properties, 0); } private ResourceGroup(long id, String name, Map<String, String> properties, long version) { @@ -72,6 +76,8 @@ public class ResourceGroup implements Writable { this.name = name; this.properties = properties; this.version = version; + String memoryLimitString = properties.get(MEMORY_LIMIT); + this.memoryLimitPercent = Double.parseDouble(memoryLimitString.substring(0, memoryLimitString.length() - 1)); } public static ResourceGroup create(String name, Map<String, String> properties) throws DdlException { @@ -79,10 +85,9 @@ public class ResourceGroup implements Writable { return new ResourceGroup(Env.getCurrentEnv().getNextId(), name, properties); } - public static ResourceGroup create(ResourceGroup resourceGroup, Map<String, String> updateProperties) + public static ResourceGroup copyAndUpdate(ResourceGroup resourceGroup, Map<String, String> updateProperties) throws DdlException { - Map<String, String> newProperties = new HashMap<>(); - newProperties.putAll(resourceGroup.getProperties()); + Map<String, String> newProperties = new HashMap<>(resourceGroup.getProperties()); for (Map.Entry<String, String> kv : updateProperties.entrySet()) { if (!Strings.isNullOrEmpty(kv.getValue())) { newProperties.put(kv.getKey(), kv.getValue()); @@ -108,7 +113,21 @@ public class ResourceGroup implements Writable { String cpuSchedulingWeight = properties.get(CPU_SHARE); if (!StringUtils.isNumeric(cpuSchedulingWeight) || Long.parseLong(cpuSchedulingWeight) <= 0) { - throw new DdlException(CPU_SHARE + " requires a positive integer."); + throw new DdlException(CPU_SHARE + " " + cpuSchedulingWeight + " requires a positive integer."); + } + + String memoryLimit = properties.get(MEMORY_LIMIT); + if (!memoryLimit.endsWith("%")) { + throw new DdlException(MEMORY_LIMIT + " " + memoryLimit + " requires a percentage and ends with a '%'"); + } + String memLimitErr = MEMORY_LIMIT + " " + memoryLimit + " requires a positive floating point number."; + try { + if (Double.parseDouble(memoryLimit.substring(0, memoryLimit.length() - 1)) <= 0) { + throw new DdlException(memLimitErr); + } + } catch (NumberFormatException e) { + LOG.debug(memLimitErr, e); + throw new DdlException(memLimitErr); } } @@ -128,6 +147,10 @@ public class ResourceGroup implements Writable { return version; } + public double getMemoryLimitPercent() { + return memoryLimitPercent; + } + public void getProcNodeData(BaseProcResult result) { for (Map.Entry<String, String> entry : properties.entrySet()) { result.addRow(Lists.newArrayList(String.valueOf(id), name, entry.getKey(), entry.getValue())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java index d4e824365c..d4d22790d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java @@ -46,6 +46,7 @@ import java.io.DataOutput; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ResourceGroupMgr implements Writable, GsonPostProcessable { @@ -122,6 +123,7 @@ public class ResourceGroupMgr implements Writable, GsonPostProcessable { } Map<String, String> properties = Maps.newHashMap(); properties.put(ResourceGroup.CPU_SHARE, "10"); + properties.put(ResourceGroup.MEMORY_LIMIT, "100%"); defaultResourceGroup = ResourceGroup.create(DEFAULT_GROUP_NAME, properties); nameToResourceGroup.put(DEFAULT_GROUP_NAME, defaultResourceGroup); idToResourceGroup.put(defaultResourceGroup.getId(), defaultResourceGroup); @@ -141,12 +143,14 @@ public class ResourceGroupMgr implements Writable, GsonPostProcessable { String resourceGroupName = resourceGroup.getName(); writeLock(); try { - if (nameToResourceGroup.putIfAbsent(resourceGroupName, resourceGroup) != null) { + if (nameToResourceGroup.containsKey(resourceGroupName)) { if (stmt.isIfNotExists()) { return; } throw new DdlException("Resource group " + resourceGroupName + " already exist"); } + checkGlobalUnlock(resourceGroup, null); + nameToResourceGroup.put(resourceGroupName, resourceGroup); idToResourceGroup.put(resourceGroup.getId(), resourceGroup); Env.getCurrentEnv().getEditLog().logCreateResourceGroup(resourceGroup); } finally { @@ -155,6 +159,18 @@ public class ResourceGroupMgr implements Writable, GsonPostProcessable { LOG.info("Create resource group success: {}", resourceGroup); } + private void checkGlobalUnlock(ResourceGroup resourceGroup, ResourceGroup old) throws DdlException { + double totalMemoryLimit = idToResourceGroup.values().stream().mapToDouble(ResourceGroup::getMemoryLimitPercent) + .sum() + resourceGroup.getMemoryLimitPercent(); + if (!Objects.isNull(old)) { + totalMemoryLimit -= old.getMemoryLimitPercent(); + } + if (totalMemoryLimit > 100.0 + 1e-6) { + throw new DdlException( + "The sum of all resource group " + ResourceGroup.MEMORY_LIMIT + " cannot be greater than 100.0%."); + } + } + public void alterResourceGroup(AlterResourceGroupStmt stmt) throws DdlException { checkResourceGroupEnabled(); @@ -167,7 +183,8 @@ public class ResourceGroupMgr implements Writable, GsonPostProcessable { throw new DdlException("Resource Group(" + resourceGroupName + ") does not exist."); } ResourceGroup resourceGroup = nameToResourceGroup.get(resourceGroupName); - newResourceGroup = ResourceGroup.create(resourceGroup, properties); + newResourceGroup = ResourceGroup.copyAndUpdate(resourceGroup, properties); + checkGlobalUnlock(newResourceGroup, resourceGroup); nameToResourceGroup.put(resourceGroupName, newResourceGroup); idToResourceGroup.put(newResourceGroup.getId(), newResourceGroup); Env.getCurrentEnv().getEditLog().logAlterResourceGroup(newResourceGroup); @@ -181,7 +198,7 @@ public class ResourceGroupMgr implements Writable, GsonPostProcessable { checkResourceGroupEnabled(); String resourceGroupName = stmt.getResourceGroupName(); - if (resourceGroupName == DEFAULT_GROUP_NAME) { + if (DEFAULT_GROUP_NAME.equals(resourceGroupName)) { throw new DdlException("Dropping default resource group " + resourceGroupName + " is not allowed"); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgrTest.java index 4a0f18914a..f6562f584a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgrTest.java @@ -82,6 +82,7 @@ public class ResourceGroupMgrTest { ResourceGroupMgr resourceGroupMgr = new ResourceGroupMgr(); Map<String, String> properties1 = Maps.newHashMap(); properties1.put(ResourceGroup.CPU_SHARE, "10"); + properties1.put(ResourceGroup.MEMORY_LIMIT, "30%"); String name1 = "g1"; CreateResourceGroupStmt stmt1 = new CreateResourceGroupStmt(false, name1, properties1); resourceGroupMgr.createResourceGroup(stmt1); @@ -98,6 +99,7 @@ public class ResourceGroupMgrTest { Map<String, String> properties2 = Maps.newHashMap(); properties2.put(ResourceGroup.CPU_SHARE, "20"); + properties2.put(ResourceGroup.MEMORY_LIMIT, "30%"); String name2 = "g2"; CreateResourceGroupStmt stmt2 = new CreateResourceGroupStmt(false, name2, properties2); resourceGroupMgr.createResourceGroup(stmt2); @@ -129,6 +131,7 @@ public class ResourceGroupMgrTest { ResourceGroupMgr resourceGroupMgr = new ResourceGroupMgr(); Map<String, String> properties1 = Maps.newHashMap(); properties1.put(ResourceGroup.CPU_SHARE, "10"); + properties1.put(ResourceGroup.MEMORY_LIMIT, "30%"); String name1 = "g1"; CreateResourceGroupStmt stmt1 = new CreateResourceGroupStmt(false, name1, properties1); resourceGroupMgr.createResourceGroup(stmt1); @@ -152,6 +155,7 @@ public class ResourceGroupMgrTest { ResourceGroupMgr resourceGroupMgr = new ResourceGroupMgr(); Map<String, String> properties = Maps.newHashMap(); properties.put(ResourceGroup.CPU_SHARE, "10"); + properties.put(ResourceGroup.MEMORY_LIMIT, "30%"); String name = "g1"; CreateResourceGroupStmt createStmt = new CreateResourceGroupStmt(false, name, properties); resourceGroupMgr.createResourceGroup(createStmt); @@ -188,11 +192,13 @@ public class ResourceGroupMgrTest { } properties.put(ResourceGroup.CPU_SHARE, "10"); + properties.put(ResourceGroup.MEMORY_LIMIT, "30%"); CreateResourceGroupStmt createStmt = new CreateResourceGroupStmt(false, name, properties); resourceGroupMgr.createResourceGroup(createStmt); Map<String, String> newProperties = Maps.newHashMap(); newProperties.put(ResourceGroup.CPU_SHARE, "5"); + newProperties.put(ResourceGroup.MEMORY_LIMIT, "30%"); AlterResourceGroupStmt stmt2 = new AlterResourceGroupStmt(name, newProperties); resourceGroupMgr.alterResourceGroup(stmt2); diff --git a/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java b/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java index 978d28b8f1..9f174e201c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java @@ -20,7 +20,6 @@ package org.apache.doris.resource.resourcegroup; import org.apache.doris.common.DdlException; import org.apache.doris.common.proc.BaseProcResult; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Test; @@ -34,17 +33,20 @@ public class ResourceGroupTest { public void testCreateNormal() throws DdlException { Map<String, String> properties1 = Maps.newHashMap(); properties1.put(ResourceGroup.CPU_SHARE, "10"); + properties1.put(ResourceGroup.MEMORY_LIMIT, "30%"); String name1 = "g1"; ResourceGroup group1 = ResourceGroup.create(name1, properties1); Assert.assertEquals(name1, group1.getName()); - Assert.assertEquals(1, group1.getProperties().size()); + Assert.assertEquals(2, group1.getProperties().size()); Assert.assertTrue(group1.getProperties().containsKey(ResourceGroup.CPU_SHARE)); + Assert.assertTrue(Math.abs(group1.getMemoryLimitPercent() - 30) < 1e-6); } @Test(expected = DdlException.class) public void testNotSupportProperty() throws DdlException { Map<String, String> properties1 = Maps.newHashMap(); properties1.put(ResourceGroup.CPU_SHARE, "10"); + properties1.put(ResourceGroup.MEMORY_LIMIT, "30%"); properties1.put("share", "10"); String name1 = "g1"; ResourceGroup.create(name1, properties1); @@ -61,12 +63,13 @@ public class ResourceGroupTest { public void testCpuShareValue() { Map<String, String> properties1 = Maps.newHashMap(); properties1.put(ResourceGroup.CPU_SHARE, "0"); + properties1.put(ResourceGroup.MEMORY_LIMIT, "30%"); String name1 = "g1"; try { ResourceGroup.create(name1, properties1); Assert.fail(); } catch (DdlException e) { - Assert.assertTrue(e.getMessage().contains(ResourceGroup.CPU_SHARE + " requires a positive integer.")); + Assert.assertTrue(e.getMessage().contains("requires a positive integer.")); } properties1.put(ResourceGroup.CPU_SHARE, "cpu"); @@ -74,7 +77,7 @@ public class ResourceGroupTest { ResourceGroup.create(name1, properties1); Assert.fail(); } catch (DdlException e) { - Assert.assertTrue(e.getMessage().contains(ResourceGroup.CPU_SHARE + " requires a positive integer.")); + Assert.assertTrue(e.getMessage().contains("requires a positive integer.")); } } @@ -82,19 +85,13 @@ public class ResourceGroupTest { public void testGetProcNodeData() throws DdlException { Map<String, String> properties1 = Maps.newHashMap(); properties1.put(ResourceGroup.CPU_SHARE, "10"); + properties1.put(ResourceGroup.MEMORY_LIMIT, "30%"); String name1 = "g1"; ResourceGroup group1 = ResourceGroup.create(name1, properties1); BaseProcResult result = new BaseProcResult(); group1.getProcNodeData(result); List<List<String>> rows = result.getRows(); - Assert.assertEquals(1, rows.size()); - List<List<String>> expectedRows = Lists.newArrayList(); - expectedRows.add(Lists.newArrayList(String.valueOf(group1.getId()), name1, ResourceGroup.CPU_SHARE, "10")); - for (int i = 0; i < expectedRows.size(); ++i) { - for (int j = 0; j < expectedRows.get(i).size(); ++j) { - Assert.assertEquals(expectedRows.get(i).get(j), rows.get(i).get(j)); - } - } + Assert.assertEquals(2, rows.size()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org