This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0217028fad526ebb43f57e974735e7a24c9186dd Author: yiguolei <676222...@qq.com> AuthorDate: Tue Sep 10 16:06:45 2024 +0800 move paused query and handle logic to workload group manager (#40603) Issue Number: close #xxx <!--Describe your changes.--> --------- Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/common/daemon.cpp | 9 +- be/src/pipeline/pipeline_task.cpp | 8 +- be/src/pipeline/task_scheduler.cpp | 170 +-------------------- be/src/pipeline/task_scheduler.h | 33 ---- .../workload_group/workload_group_manager.cpp | 157 +++++++++++++++++++ .../workload_group/workload_group_manager.h | 35 +++++ 6 files changed, 206 insertions(+), 206 deletions(-) diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 713813b4a33..c3f8d89de82 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -298,15 +298,18 @@ void Daemon::memory_maintenance_thread() { // step 6. Refresh weighted memory ratio of workload groups. doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit(); - // step 7. Analyze blocking queries. + // step 7: handle paused queries(caused by memory insufficient) + doris::ExecEnv::GetInstance()->workload_group_mgr()->handle_paused_queries(); + + // step 8. Analyze blocking queries. // TODO sort the operators that can spill, wake up the pipeline task spill // or continue execution according to certain rules or cancel query. - // step 8. Flush memtable + // step 9. Flush memtable doris::GlobalMemoryArbitrator::notify_memtable_memory_refresh(); // TODO notify flush memtable - // step 9. Jemalloc purge all arena dirty pages + // step 10. Jemalloc purge all arena dirty pages je_purge_dirty_pages(); } } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 10309000ced..d328018c27f 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -34,8 +34,10 @@ #include "pipeline/task_queue.h" #include "pipeline/task_scheduler.h" #include "runtime/descriptors.h" +#include "runtime/exec_env.h" #include "runtime/query_context.h" #include "runtime/thread_context.h" +#include "runtime/workload_group/workload_group_manager.h" #include "util/container_util.hpp" #include "util/defer_op.h" #include "util/mem_info.h" @@ -399,7 +401,8 @@ Status PipelineTask::execute(bool* eos) { } _memory_sufficient_dependency->block(); - _state->get_query_ctx()->get_pipe_exec_scheduler()->add_paused_task(this); + ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( + _state->get_query_ctx()->shared_from_this()); continue; } has_enough_memory = false; @@ -436,7 +439,8 @@ Status PipelineTask::execute(bool* eos) { LOG(INFO) << "query: " << print_id(query_id) << ", task: " << (void*)this << ", insufficient memory. reserve_size: " << reserve_size; _memory_sufficient_dependency->block(); - _state->get_query_ctx()->get_pipe_exec_scheduler()->add_paused_task(this); + ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( + _state->get_query_ctx()->shared_from_this()); break; } } diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index a4379f73b64..715feceed98 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -62,8 +62,8 @@ Status TaskScheduler::start() { // some for pipeline task running // 1 for spill disk query handler RETURN_IF_ERROR(ThreadPoolBuilder(_name) - .set_min_threads(cores + 1) - .set_max_threads(cores + 1) + .set_min_threads(cores) + .set_max_threads(cores) .set_max_queue_size(0) .set_cgroup_cpu_ctl(_cgroup_cpu_ctl) .build(&_fix_thread_pool)); @@ -71,8 +71,6 @@ Status TaskScheduler::start() { for (size_t i = 0; i < cores; ++i) { RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); })); } - - RETURN_IF_ERROR(_fix_thread_pool->submit_func([this] { _paused_queries_handler(); })); return Status::OK(); } @@ -199,169 +197,6 @@ void TaskScheduler::_do_work(size_t index) { } } -void TaskScheduler::add_paused_task(PipelineTask* task) { - std::lock_guard<std::mutex> lock(_paused_queries_lock); - auto query_ctx_sptr = task->runtime_state()->get_query_ctx()->shared_from_this(); - DCHECK(query_ctx_sptr != nullptr); - auto wg = query_ctx_sptr->workload_group(); - auto&& [it, inserted] = _paused_queries_list[wg].emplace(std::move(query_ctx_sptr)); - if (inserted) { - LOG(INFO) << "here insert one new paused query: " << it->query_id() - << ", wg: " << (void*)(wg.get()); - } - - _paused_queries_cv.notify_all(); -} - -/** - * Strategy 1: A revocable query should not have any running task(PipelineTask). - * strategy 2: If the workload group is below low water mark, we make all queries in this wg runnable. - * strategy 3: Pick the query which has the max revocable size to revoke memory. - * strategy 4: If all queries are not revocable and they all have not any running task, - * we choose the max memory usage query to cancel. - */ -void TaskScheduler::_paused_queries_handler() { - while (!_need_to_stop) { - { - std::unique_lock<std::mutex> lock(_paused_queries_lock); - if (_paused_queries_list.empty()) { - _paused_queries_cv.wait(lock, [&] { return !_paused_queries_list.empty(); }); - } - - if (_need_to_stop) { - break; - } - - if (_paused_queries_list.empty()) { - continue; - } - - for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { - auto& queries_list = it->second; - const auto& wg = it->first; - if (queries_list.empty()) { - LOG(INFO) << "wg: " << wg->debug_string() << " has no paused query"; - it = _paused_queries_list.erase(it); - continue; - } - - bool is_low_wartermark = false; - bool is_high_wartermark = false; - - wg->check_mem_used(&is_low_wartermark, &is_high_wartermark); - - if (!is_low_wartermark && !is_high_wartermark) { - LOG(INFO) << "**** there are " << queries_list.size() << " to resume"; - for (const auto& query : queries_list) { - LOG(INFO) << "**** resume paused query: " << query.query_id(); - query.query_ctx->set_memory_sufficient(true); - } - - queries_list.clear(); - it = _paused_queries_list.erase(it); - continue; - } else { - ++it; - } - - std::shared_ptr<QueryContext> max_revocable_query; - std::shared_ptr<QueryContext> max_memory_usage_query; - std::shared_ptr<QueryContext> running_query; - bool has_running_query = false; - size_t max_revocable_size = 0; - size_t max_memory_usage = 0; - auto it_to_remove = queries_list.end(); - - for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { - const auto& query_ctx = query_it->query_ctx; - size_t revocable_size = 0; - size_t memory_usage = 0; - bool has_running_task = false; - - if (query_ctx->is_cancelled()) { - LOG(INFO) << "query: " << print_id(query_ctx->query_id()) - << "was canceled, remove from paused list"; - query_it = queries_list.erase(query_it); - continue; - } - - query_ctx->get_revocable_info(&revocable_size, &memory_usage, - &has_running_task); - if (has_running_task) { - has_running_query = true; - running_query = query_ctx; - break; - } else if (revocable_size > max_revocable_size) { - max_revocable_query = query_ctx; - max_revocable_size = revocable_size; - it_to_remove = query_it; - } else if (memory_usage > max_memory_usage) { - max_memory_usage_query = query_ctx; - max_memory_usage = memory_usage; - it_to_remove = query_it; - } - - ++query_it; - } - - if (has_running_query) { - LOG(INFO) << "has running task, query: " << print_id(running_query->query_id()); - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - } else if (max_revocable_query) { - queries_list.erase(it_to_remove); - queries_list.insert(queries_list.begin(), max_revocable_query); - - auto revocable_tasks = max_revocable_query->get_revocable_tasks(); - DCHECK(!revocable_tasks.empty()); - - LOG(INFO) << "query: " << print_id(max_revocable_query->query_id()) << ", has " - << revocable_tasks.size() - << " tasks to revoke memory, max revocable size: " - << max_revocable_size; - SCOPED_ATTACH_TASK(max_revocable_query.get()); - for (auto* task : revocable_tasks) { - auto st = task->revoke_memory(); - if (!st.ok()) { - max_revocable_query->cancel(st); - break; - } - } - } else if (max_memory_usage_query) { - bool new_is_low_wartermark = false; - bool new_is_high_wartermark = false; - const auto query_id = print_id(max_memory_usage_query->query_id()); - wg->check_mem_used(&new_is_low_wartermark, &new_is_high_wartermark); - if (new_is_high_wartermark) { - if (it_to_remove->elapsed_time() < 2000) { - LOG(INFO) << "memory insufficient and cannot find revocable query, " - "the max usage query: " - << query_id << ", usage: " << max_memory_usage - << ", elapsed: " << it_to_remove->elapsed_time() - << ", wg info: " << wg->debug_string(); - continue; - } - max_memory_usage_query->cancel(Status::InternalError( - "memory insufficient and cannot find revocable query, cancel " - "the " - "biggest usage({}) query({})", - max_memory_usage, query_id)); - queries_list.erase(it_to_remove); - - } else { - LOG(INFO) << "non high water mark, resume " - "the query: " - << query_id << ", usage: " << max_memory_usage - << ", wg info: " << wg->debug_string(); - max_memory_usage_query->set_memory_sufficient(true); - queries_list.erase(it_to_remove); - } - } - } - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } -} - void TaskScheduler::stop() { if (!_shutdown) { if (_task_queue) { @@ -369,7 +204,6 @@ void TaskScheduler::stop() { } if (_fix_thread_pool) { _need_to_stop = true; - _paused_queries_cv.notify_all(); _fix_thread_pool->shutdown(); _fix_thread_pool->wait(); } diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index bed38887037..8ad510fd8a8 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -46,31 +46,6 @@ class TaskQueue; namespace doris::pipeline { -struct PausedQuery { - std::shared_ptr<QueryContext> query_ctx; - std::chrono::system_clock::time_point enqueue_at; - size_t last_mem_usage {0}; - - PausedQuery(std::shared_ptr<QueryContext> query_ctx_) - : query_ctx(std::move(query_ctx_)), _query_id(print_id(query_ctx->query_id())) { - enqueue_at = std::chrono::system_clock::now(); - } - - int64_t elapsed_time() const { - auto now = std::chrono::system_clock::now(); - return std::chrono::duration_cast<std::chrono::milliseconds>(now - enqueue_at).count(); - } - - std::string query_id() const { return _query_id; } - - bool operator<(const PausedQuery& other) const { return _query_id < other._query_id; } - - bool operator==(const PausedQuery& other) const { return _query_id == other._query_id; } - -private: - std::string _query_id; -}; - class TaskScheduler { public: TaskScheduler(ExecEnv* exec_env, std::shared_ptr<TaskQueue> task_queue, std::string name, @@ -89,8 +64,6 @@ public: std::vector<int> thread_debug_info() { return _fix_thread_pool->debug_info(); } - void add_paused_task(PipelineTask* task); - private: std::unique_ptr<ThreadPool> _fix_thread_pool; std::shared_ptr<TaskQueue> _task_queue; @@ -99,12 +72,6 @@ private: std::string _name; CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; - std::map<WorkloadGroupPtr, std::set<PausedQuery>> _paused_queries_list; - std::mutex _paused_queries_lock; - std::condition_variable _paused_queries_cv; - void _do_work(size_t index); - - void _paused_queries_handler(); }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 65a8e3685c8..7b1a0ad87db 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -34,6 +34,11 @@ namespace doris { +PausedQuery::PausedQuery(std::shared_ptr<QueryContext> query_ctx) + : query_ctx_(query_ctx), query_id_(print_id(query_ctx->query_id())) { + enqueue_at = std::chrono::system_clock::now(); +} + WorkloadGroupPtr WorkloadGroupMgr::get_or_create_workload_group( const WorkloadGroupInfo& workload_group_info) { { @@ -270,6 +275,158 @@ void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) { } } +void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& query_ctx) { + std::lock_guard<std::mutex> lock(_paused_queries_lock); + DCHECK(query_ctx != nullptr); + auto wg = query_ctx->workload_group(); + auto&& [it, inserted] = _paused_queries_list[wg].emplace(query_ctx); + if (inserted) { + LOG(INFO) << "here insert one new paused query: " << it->query_id() + << ", wg: " << (void*)(wg.get()); + } +} + +/** + * Strategy 1: A revocable query should not have any running task(PipelineTask). + * strategy 2: If the workload group is below low water mark, we make all queries in this wg runnable. + * strategy 3: Pick the query which has the max revocable size to revoke memory. + * strategy 4: If all queries are not revocable and they all have not any running task, + * we choose the max memory usage query to cancel. + */ +void WorkloadGroupMgr::handle_paused_queries() { + std::unique_lock<std::mutex> lock(_paused_queries_lock); + if (_paused_queries_list.empty()) { + return; + } + + for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { + auto& queries_list = it->second; + const auto& wg = it->first; + if (queries_list.empty()) { + LOG(INFO) << "wg: " << wg->debug_string() << " has no paused query"; + it = _paused_queries_list.erase(it); + continue; + } + + bool is_low_wartermark = false; + bool is_high_wartermark = false; + + wg->check_mem_used(&is_low_wartermark, &is_high_wartermark); + + if (!is_low_wartermark && !is_high_wartermark) { + LOG(INFO) << "**** there are " << queries_list.size() << " to resume"; + for (const auto& query : queries_list) { + LOG(INFO) << "**** resume paused query: " << query.query_id(); + auto query_ctx = query.query_ctx_.lock(); + if (query_ctx != nullptr) { + query_ctx->set_memory_sufficient(true); + } + } + + queries_list.clear(); + it = _paused_queries_list.erase(it); + continue; + } else { + ++it; + } + + std::shared_ptr<QueryContext> max_revocable_query; + std::shared_ptr<QueryContext> max_memory_usage_query; + std::shared_ptr<QueryContext> running_query; + bool has_running_query = false; + size_t max_revocable_size = 0; + size_t max_memory_usage = 0; + auto it_to_remove = queries_list.end(); + + for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { + const auto query_ctx = query_it->query_ctx_.lock(); + // The query is finished during in paused list. + if (query_ctx == nullptr) { + query_it = queries_list.erase(query_it); + continue; + } + size_t revocable_size = 0; + size_t memory_usage = 0; + bool has_running_task = false; + + if (query_ctx->is_cancelled()) { + LOG(INFO) << "query: " << print_id(query_ctx->query_id()) + << "was canceled, remove from paused list"; + query_it = queries_list.erase(query_it); + continue; + } + + query_ctx->get_revocable_info(&revocable_size, &memory_usage, &has_running_task); + if (has_running_task) { + has_running_query = true; + running_query = query_ctx; + break; + } else if (revocable_size > max_revocable_size) { + max_revocable_query = query_ctx; + max_revocable_size = revocable_size; + it_to_remove = query_it; + } else if (memory_usage > max_memory_usage) { + max_memory_usage_query = query_ctx; + max_memory_usage = memory_usage; + it_to_remove = query_it; + } + + ++query_it; + } + + if (has_running_query) { + LOG(INFO) << "has running task, query: " << print_id(running_query->query_id()); + } else if (max_revocable_query) { + queries_list.erase(it_to_remove); + queries_list.insert(queries_list.begin(), max_revocable_query); + + auto revocable_tasks = max_revocable_query->get_revocable_tasks(); + DCHECK(!revocable_tasks.empty()); + + LOG(INFO) << "query: " << print_id(max_revocable_query->query_id()) << ", has " + << revocable_tasks.size() + << " tasks to revoke memory, max revocable size: " << max_revocable_size; + SCOPED_ATTACH_TASK(max_revocable_query.get()); + for (auto* task : revocable_tasks) { + auto st = task->revoke_memory(); + if (!st.ok()) { + max_revocable_query->cancel(st); + break; + } + } + } else if (max_memory_usage_query) { + bool new_is_low_wartermark = false; + bool new_is_high_wartermark = false; + const auto query_id = print_id(max_memory_usage_query->query_id()); + wg->check_mem_used(&new_is_low_wartermark, &new_is_high_wartermark); + if (new_is_high_wartermark) { + if (it_to_remove->elapsed_time() < 2000) { + LOG(INFO) << "memory insufficient and cannot find revocable query, " + "the max usage query: " + << query_id << ", usage: " << max_memory_usage + << ", elapsed: " << it_to_remove->elapsed_time() + << ", wg info: " << wg->debug_string(); + continue; + } + max_memory_usage_query->cancel(Status::InternalError( + "memory insufficient and cannot find revocable query, cancel " + "the " + "biggest usage({}) query({})", + max_memory_usage, query_id)); + queries_list.erase(it_to_remove); + + } else { + LOG(INFO) << "non high water mark, resume " + "the query: " + << query_id << ", usage: " << max_memory_usage + << ", wg info: " << wg->debug_string(); + max_memory_usage_query->set_memory_sufficient(true); + queries_list.erase(it_to_remove); + } + } + } +} + void WorkloadGroupMgr::stop() { for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) { iter->second->try_stop_schedulers(); diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index d8547c3383e..68d4f932de0 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -29,6 +29,7 @@ class CgroupCpuCtl; namespace vectorized { class Block; +class QueryContext; } // namespace vectorized namespace pipeline { @@ -36,6 +37,31 @@ class TaskScheduler; class MultiCoreTaskQueue; } // namespace pipeline +class PausedQuery { +public: + // Use weak ptr to save query ctx, to make sure if the query is cancelled + // the resource will be released + std::weak_ptr<QueryContext> query_ctx_; + std::chrono::system_clock::time_point enqueue_at; + size_t last_mem_usage {0}; + + PausedQuery(std::shared_ptr<QueryContext> query_ctx); + + int64_t elapsed_time() const { + auto now = std::chrono::system_clock::now(); + return std::chrono::duration_cast<std::chrono::milliseconds>(now - enqueue_at).count(); + } + + std::string query_id() const { return query_id_; } + + bool operator<(const PausedQuery& other) const { return query_id_ < other.query_id_; } + + bool operator==(const PausedQuery& other) const { return query_id_ == other.query_id_; } + +private: + std::string query_id_; +}; + class WorkloadGroupMgr { public: WorkloadGroupMgr() = default; @@ -62,11 +88,20 @@ public: void get_wg_resource_usage(vectorized::Block* block); + void add_paused_query(const std::shared_ptr<QueryContext>& query_ctx); + + void handle_paused_queries(); + private: std::shared_mutex _group_mutex; std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups; std::shared_mutex _clear_cgroup_lock; + + // Save per group paused query list, it should be a global structure, not per + // workload group, because we need do some coordinate work globally. + std::mutex _paused_queries_lock; + std::map<WorkloadGroupPtr, std::set<PausedQuery>> _paused_queries_list; }; } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org