This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 0217028fad526ebb43f57e974735e7a24c9186dd
Author: yiguolei <676222...@qq.com>
AuthorDate: Tue Sep 10 16:06:45 2024 +0800

    move paused query and handle logic to workload group manager (#40603)
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    ---------
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/common/daemon.cpp                           |   9 +-
 be/src/pipeline/pipeline_task.cpp                  |   8 +-
 be/src/pipeline/task_scheduler.cpp                 | 170 +--------------------
 be/src/pipeline/task_scheduler.h                   |  33 ----
 .../workload_group/workload_group_manager.cpp      | 157 +++++++++++++++++++
 .../workload_group/workload_group_manager.h        |  35 +++++
 6 files changed, 206 insertions(+), 206 deletions(-)

diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 713813b4a33..c3f8d89de82 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -298,15 +298,18 @@ void Daemon::memory_maintenance_thread() {
         // step 6. Refresh weighted memory ratio of workload groups.
         
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();
 
-        // step 7. Analyze blocking queries.
+        // step 7: handle paused queries(caused by memory insufficient)
+        
doris::ExecEnv::GetInstance()->workload_group_mgr()->handle_paused_queries();
+
+        // step 8. Analyze blocking queries.
         // TODO sort the operators that can spill, wake up the pipeline task 
spill
         // or continue execution according to certain rules or cancel query.
 
-        // step 8. Flush memtable
+        // step 9. Flush memtable
         doris::GlobalMemoryArbitrator::notify_memtable_memory_refresh();
         // TODO notify flush memtable
 
-        // step 9. Jemalloc purge all arena dirty pages
+        // step 10. Jemalloc purge all arena dirty pages
         je_purge_dirty_pages();
     }
 }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 10309000ced..d328018c27f 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -34,8 +34,10 @@
 #include "pipeline/task_queue.h"
 #include "pipeline/task_scheduler.h"
 #include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
 #include "runtime/query_context.h"
 #include "runtime/thread_context.h"
+#include "runtime/workload_group/workload_group_manager.h"
 #include "util/container_util.hpp"
 #include "util/defer_op.h"
 #include "util/mem_info.h"
@@ -399,7 +401,8 @@ Status PipelineTask::execute(bool* eos) {
                         }
 
                         _memory_sufficient_dependency->block();
-                        
_state->get_query_ctx()->get_pipe_exec_scheduler()->add_paused_task(this);
+                        
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+                                _state->get_query_ctx()->shared_from_this());
                         continue;
                     }
                     has_enough_memory = false;
@@ -436,7 +439,8 @@ Status PipelineTask::execute(bool* eos) {
             LOG(INFO) << "query: " << print_id(query_id) << ", task: " << 
(void*)this
                       << ", insufficient memory. reserve_size: " << 
reserve_size;
             _memory_sufficient_dependency->block();
-            
_state->get_query_ctx()->get_pipe_exec_scheduler()->add_paused_task(this);
+            ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+                    _state->get_query_ctx()->shared_from_this());
             break;
         }
     }
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index a4379f73b64..715feceed98 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -62,8 +62,8 @@ Status TaskScheduler::start() {
     // some for pipeline task running
     // 1 for spill disk query handler
     RETURN_IF_ERROR(ThreadPoolBuilder(_name)
-                            .set_min_threads(cores + 1)
-                            .set_max_threads(cores + 1)
+                            .set_min_threads(cores)
+                            .set_max_threads(cores)
                             .set_max_queue_size(0)
                             .set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
                             .build(&_fix_thread_pool));
@@ -71,8 +71,6 @@ Status TaskScheduler::start() {
     for (size_t i = 0; i < cores; ++i) {
         RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); 
}));
     }
-
-    RETURN_IF_ERROR(_fix_thread_pool->submit_func([this] { 
_paused_queries_handler(); }));
     return Status::OK();
 }
 
@@ -199,169 +197,6 @@ void TaskScheduler::_do_work(size_t index) {
     }
 }
 
-void TaskScheduler::add_paused_task(PipelineTask* task) {
-    std::lock_guard<std::mutex> lock(_paused_queries_lock);
-    auto query_ctx_sptr = 
task->runtime_state()->get_query_ctx()->shared_from_this();
-    DCHECK(query_ctx_sptr != nullptr);
-    auto wg = query_ctx_sptr->workload_group();
-    auto&& [it, inserted] = 
_paused_queries_list[wg].emplace(std::move(query_ctx_sptr));
-    if (inserted) {
-        LOG(INFO) << "here insert one new paused query: " << it->query_id()
-                  << ", wg: " << (void*)(wg.get());
-    }
-
-    _paused_queries_cv.notify_all();
-}
-
-/**
- * Strategy 1: A revocable query should not have any running 
task(PipelineTask).
- * strategy 2: If the workload group is below low water mark, we make all 
queries in this wg runnable.
- * strategy 3: Pick the query which has the max revocable size to revoke 
memory.
- * strategy 4: If all queries are not revocable and they all have not any 
running task,
- *             we choose the max memory usage query to cancel.
- */
-void TaskScheduler::_paused_queries_handler() {
-    while (!_need_to_stop) {
-        {
-            std::unique_lock<std::mutex> lock(_paused_queries_lock);
-            if (_paused_queries_list.empty()) {
-                _paused_queries_cv.wait(lock, [&] { return 
!_paused_queries_list.empty(); });
-            }
-
-            if (_need_to_stop) {
-                break;
-            }
-
-            if (_paused_queries_list.empty()) {
-                continue;
-            }
-
-            for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
-                auto& queries_list = it->second;
-                const auto& wg = it->first;
-                if (queries_list.empty()) {
-                    LOG(INFO) << "wg: " << wg->debug_string() << " has no 
paused query";
-                    it = _paused_queries_list.erase(it);
-                    continue;
-                }
-
-                bool is_low_wartermark = false;
-                bool is_high_wartermark = false;
-
-                wg->check_mem_used(&is_low_wartermark, &is_high_wartermark);
-
-                if (!is_low_wartermark && !is_high_wartermark) {
-                    LOG(INFO) << "**** there are " << queries_list.size() << " 
to resume";
-                    for (const auto& query : queries_list) {
-                        LOG(INFO) << "**** resume paused query: " << 
query.query_id();
-                        query.query_ctx->set_memory_sufficient(true);
-                    }
-
-                    queries_list.clear();
-                    it = _paused_queries_list.erase(it);
-                    continue;
-                } else {
-                    ++it;
-                }
-
-                std::shared_ptr<QueryContext> max_revocable_query;
-                std::shared_ptr<QueryContext> max_memory_usage_query;
-                std::shared_ptr<QueryContext> running_query;
-                bool has_running_query = false;
-                size_t max_revocable_size = 0;
-                size_t max_memory_usage = 0;
-                auto it_to_remove = queries_list.end();
-
-                for (auto query_it = queries_list.begin(); query_it != 
queries_list.end();) {
-                    const auto& query_ctx = query_it->query_ctx;
-                    size_t revocable_size = 0;
-                    size_t memory_usage = 0;
-                    bool has_running_task = false;
-
-                    if (query_ctx->is_cancelled()) {
-                        LOG(INFO) << "query: " << 
print_id(query_ctx->query_id())
-                                  << "was canceled, remove from paused list";
-                        query_it = queries_list.erase(query_it);
-                        continue;
-                    }
-
-                    query_ctx->get_revocable_info(&revocable_size, 
&memory_usage,
-                                                  &has_running_task);
-                    if (has_running_task) {
-                        has_running_query = true;
-                        running_query = query_ctx;
-                        break;
-                    } else if (revocable_size > max_revocable_size) {
-                        max_revocable_query = query_ctx;
-                        max_revocable_size = revocable_size;
-                        it_to_remove = query_it;
-                    } else if (memory_usage > max_memory_usage) {
-                        max_memory_usage_query = query_ctx;
-                        max_memory_usage = memory_usage;
-                        it_to_remove = query_it;
-                    }
-
-                    ++query_it;
-                }
-
-                if (has_running_query) {
-                    LOG(INFO) << "has running task, query: " << 
print_id(running_query->query_id());
-                    
std::this_thread::sleep_for(std::chrono::milliseconds(500));
-                } else if (max_revocable_query) {
-                    queries_list.erase(it_to_remove);
-                    queries_list.insert(queries_list.begin(), 
max_revocable_query);
-
-                    auto revocable_tasks = 
max_revocable_query->get_revocable_tasks();
-                    DCHECK(!revocable_tasks.empty());
-
-                    LOG(INFO) << "query: " << 
print_id(max_revocable_query->query_id()) << ", has "
-                              << revocable_tasks.size()
-                              << " tasks to revoke memory, max revocable size: 
"
-                              << max_revocable_size;
-                    SCOPED_ATTACH_TASK(max_revocable_query.get());
-                    for (auto* task : revocable_tasks) {
-                        auto st = task->revoke_memory();
-                        if (!st.ok()) {
-                            max_revocable_query->cancel(st);
-                            break;
-                        }
-                    }
-                } else if (max_memory_usage_query) {
-                    bool new_is_low_wartermark = false;
-                    bool new_is_high_wartermark = false;
-                    const auto query_id = 
print_id(max_memory_usage_query->query_id());
-                    wg->check_mem_used(&new_is_low_wartermark, 
&new_is_high_wartermark);
-                    if (new_is_high_wartermark) {
-                        if (it_to_remove->elapsed_time() < 2000) {
-                            LOG(INFO) << "memory insufficient and cannot find 
revocable query, "
-                                         "the max usage query: "
-                                      << query_id << ", usage: " << 
max_memory_usage
-                                      << ", elapsed: " << 
it_to_remove->elapsed_time()
-                                      << ", wg info: " << wg->debug_string();
-                            continue;
-                        }
-                        max_memory_usage_query->cancel(Status::InternalError(
-                                "memory insufficient and cannot find revocable 
query, cancel "
-                                "the "
-                                "biggest usage({}) query({})",
-                                max_memory_usage, query_id));
-                        queries_list.erase(it_to_remove);
-
-                    } else {
-                        LOG(INFO) << "non high water mark, resume "
-                                     "the query: "
-                                  << query_id << ", usage: " << 
max_memory_usage
-                                  << ", wg info: " << wg->debug_string();
-                        max_memory_usage_query->set_memory_sufficient(true);
-                        queries_list.erase(it_to_remove);
-                    }
-                }
-            }
-        }
-        std::this_thread::sleep_for(std::chrono::milliseconds(100));
-    }
-}
-
 void TaskScheduler::stop() {
     if (!_shutdown) {
         if (_task_queue) {
@@ -369,7 +204,6 @@ void TaskScheduler::stop() {
         }
         if (_fix_thread_pool) {
             _need_to_stop = true;
-            _paused_queries_cv.notify_all();
             _fix_thread_pool->shutdown();
             _fix_thread_pool->wait();
         }
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index bed38887037..8ad510fd8a8 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -46,31 +46,6 @@ class TaskQueue;
 
 namespace doris::pipeline {
 
-struct PausedQuery {
-    std::shared_ptr<QueryContext> query_ctx;
-    std::chrono::system_clock::time_point enqueue_at;
-    size_t last_mem_usage {0};
-
-    PausedQuery(std::shared_ptr<QueryContext> query_ctx_)
-            : query_ctx(std::move(query_ctx_)), 
_query_id(print_id(query_ctx->query_id())) {
-        enqueue_at = std::chrono::system_clock::now();
-    }
-
-    int64_t elapsed_time() const {
-        auto now = std::chrono::system_clock::now();
-        return std::chrono::duration_cast<std::chrono::milliseconds>(now - 
enqueue_at).count();
-    }
-
-    std::string query_id() const { return _query_id; }
-
-    bool operator<(const PausedQuery& other) const { return _query_id < 
other._query_id; }
-
-    bool operator==(const PausedQuery& other) const { return _query_id == 
other._query_id; }
-
-private:
-    std::string _query_id;
-};
-
 class TaskScheduler {
 public:
     TaskScheduler(ExecEnv* exec_env, std::shared_ptr<TaskQueue> task_queue, 
std::string name,
@@ -89,8 +64,6 @@ public:
 
     std::vector<int> thread_debug_info() { return 
_fix_thread_pool->debug_info(); }
 
-    void add_paused_task(PipelineTask* task);
-
 private:
     std::unique_ptr<ThreadPool> _fix_thread_pool;
     std::shared_ptr<TaskQueue> _task_queue;
@@ -99,12 +72,6 @@ private:
     std::string _name;
     CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
 
-    std::map<WorkloadGroupPtr, std::set<PausedQuery>> _paused_queries_list;
-    std::mutex _paused_queries_lock;
-    std::condition_variable _paused_queries_cv;
-
     void _do_work(size_t index);
-
-    void _paused_queries_handler();
 };
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 65a8e3685c8..7b1a0ad87db 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -34,6 +34,11 @@
 
 namespace doris {
 
+PausedQuery::PausedQuery(std::shared_ptr<QueryContext> query_ctx)
+        : query_ctx_(query_ctx), query_id_(print_id(query_ctx->query_id())) {
+    enqueue_at = std::chrono::system_clock::now();
+}
+
 WorkloadGroupPtr WorkloadGroupMgr::get_or_create_workload_group(
         const WorkloadGroupInfo& workload_group_info) {
     {
@@ -270,6 +275,158 @@ void 
WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
     }
 }
 
+void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& 
query_ctx) {
+    std::lock_guard<std::mutex> lock(_paused_queries_lock);
+    DCHECK(query_ctx != nullptr);
+    auto wg = query_ctx->workload_group();
+    auto&& [it, inserted] = _paused_queries_list[wg].emplace(query_ctx);
+    if (inserted) {
+        LOG(INFO) << "here insert one new paused query: " << it->query_id()
+                  << ", wg: " << (void*)(wg.get());
+    }
+}
+
+/**
+ * Strategy 1: A revocable query should not have any running 
task(PipelineTask).
+ * strategy 2: If the workload group is below low water mark, we make all 
queries in this wg runnable.
+ * strategy 3: Pick the query which has the max revocable size to revoke 
memory.
+ * strategy 4: If all queries are not revocable and they all have not any 
running task,
+ *             we choose the max memory usage query to cancel.
+ */
+void WorkloadGroupMgr::handle_paused_queries() {
+    std::unique_lock<std::mutex> lock(_paused_queries_lock);
+    if (_paused_queries_list.empty()) {
+        return;
+    }
+
+    for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
+        auto& queries_list = it->second;
+        const auto& wg = it->first;
+        if (queries_list.empty()) {
+            LOG(INFO) << "wg: " << wg->debug_string() << " has no paused 
query";
+            it = _paused_queries_list.erase(it);
+            continue;
+        }
+
+        bool is_low_wartermark = false;
+        bool is_high_wartermark = false;
+
+        wg->check_mem_used(&is_low_wartermark, &is_high_wartermark);
+
+        if (!is_low_wartermark && !is_high_wartermark) {
+            LOG(INFO) << "**** there are " << queries_list.size() << " to 
resume";
+            for (const auto& query : queries_list) {
+                LOG(INFO) << "**** resume paused query: " << query.query_id();
+                auto query_ctx = query.query_ctx_.lock();
+                if (query_ctx != nullptr) {
+                    query_ctx->set_memory_sufficient(true);
+                }
+            }
+
+            queries_list.clear();
+            it = _paused_queries_list.erase(it);
+            continue;
+        } else {
+            ++it;
+        }
+
+        std::shared_ptr<QueryContext> max_revocable_query;
+        std::shared_ptr<QueryContext> max_memory_usage_query;
+        std::shared_ptr<QueryContext> running_query;
+        bool has_running_query = false;
+        size_t max_revocable_size = 0;
+        size_t max_memory_usage = 0;
+        auto it_to_remove = queries_list.end();
+
+        for (auto query_it = queries_list.begin(); query_it != 
queries_list.end();) {
+            const auto query_ctx = query_it->query_ctx_.lock();
+            // The query is finished during in paused list.
+            if (query_ctx == nullptr) {
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+            size_t revocable_size = 0;
+            size_t memory_usage = 0;
+            bool has_running_task = false;
+
+            if (query_ctx->is_cancelled()) {
+                LOG(INFO) << "query: " << print_id(query_ctx->query_id())
+                          << "was canceled, remove from paused list";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+
+            query_ctx->get_revocable_info(&revocable_size, &memory_usage, 
&has_running_task);
+            if (has_running_task) {
+                has_running_query = true;
+                running_query = query_ctx;
+                break;
+            } else if (revocable_size > max_revocable_size) {
+                max_revocable_query = query_ctx;
+                max_revocable_size = revocable_size;
+                it_to_remove = query_it;
+            } else if (memory_usage > max_memory_usage) {
+                max_memory_usage_query = query_ctx;
+                max_memory_usage = memory_usage;
+                it_to_remove = query_it;
+            }
+
+            ++query_it;
+        }
+
+        if (has_running_query) {
+            LOG(INFO) << "has running task, query: " << 
print_id(running_query->query_id());
+        } else if (max_revocable_query) {
+            queries_list.erase(it_to_remove);
+            queries_list.insert(queries_list.begin(), max_revocable_query);
+
+            auto revocable_tasks = max_revocable_query->get_revocable_tasks();
+            DCHECK(!revocable_tasks.empty());
+
+            LOG(INFO) << "query: " << 
print_id(max_revocable_query->query_id()) << ", has "
+                      << revocable_tasks.size()
+                      << " tasks to revoke memory, max revocable size: " << 
max_revocable_size;
+            SCOPED_ATTACH_TASK(max_revocable_query.get());
+            for (auto* task : revocable_tasks) {
+                auto st = task->revoke_memory();
+                if (!st.ok()) {
+                    max_revocable_query->cancel(st);
+                    break;
+                }
+            }
+        } else if (max_memory_usage_query) {
+            bool new_is_low_wartermark = false;
+            bool new_is_high_wartermark = false;
+            const auto query_id = print_id(max_memory_usage_query->query_id());
+            wg->check_mem_used(&new_is_low_wartermark, 
&new_is_high_wartermark);
+            if (new_is_high_wartermark) {
+                if (it_to_remove->elapsed_time() < 2000) {
+                    LOG(INFO) << "memory insufficient and cannot find 
revocable query, "
+                                 "the max usage query: "
+                              << query_id << ", usage: " << max_memory_usage
+                              << ", elapsed: " << it_to_remove->elapsed_time()
+                              << ", wg info: " << wg->debug_string();
+                    continue;
+                }
+                max_memory_usage_query->cancel(Status::InternalError(
+                        "memory insufficient and cannot find revocable query, 
cancel "
+                        "the "
+                        "biggest usage({}) query({})",
+                        max_memory_usage, query_id));
+                queries_list.erase(it_to_remove);
+
+            } else {
+                LOG(INFO) << "non high water mark, resume "
+                             "the query: "
+                          << query_id << ", usage: " << max_memory_usage
+                          << ", wg info: " << wg->debug_string();
+                max_memory_usage_query->set_memory_sufficient(true);
+                queries_list.erase(it_to_remove);
+            }
+        }
+    }
+}
+
 void WorkloadGroupMgr::stop() {
     for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); 
iter++) {
         iter->second->try_stop_schedulers();
diff --git a/be/src/runtime/workload_group/workload_group_manager.h 
b/be/src/runtime/workload_group/workload_group_manager.h
index d8547c3383e..68d4f932de0 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -29,6 +29,7 @@ class CgroupCpuCtl;
 
 namespace vectorized {
 class Block;
+class QueryContext;
 } // namespace vectorized
 
 namespace pipeline {
@@ -36,6 +37,31 @@ class TaskScheduler;
 class MultiCoreTaskQueue;
 } // namespace pipeline
 
+class PausedQuery {
+public:
+    // Use weak ptr to save query ctx, to make sure if the query is cancelled
+    // the resource will be released
+    std::weak_ptr<QueryContext> query_ctx_;
+    std::chrono::system_clock::time_point enqueue_at;
+    size_t last_mem_usage {0};
+
+    PausedQuery(std::shared_ptr<QueryContext> query_ctx);
+
+    int64_t elapsed_time() const {
+        auto now = std::chrono::system_clock::now();
+        return std::chrono::duration_cast<std::chrono::milliseconds>(now - 
enqueue_at).count();
+    }
+
+    std::string query_id() const { return query_id_; }
+
+    bool operator<(const PausedQuery& other) const { return query_id_ < 
other.query_id_; }
+
+    bool operator==(const PausedQuery& other) const { return query_id_ == 
other.query_id_; }
+
+private:
+    std::string query_id_;
+};
+
 class WorkloadGroupMgr {
 public:
     WorkloadGroupMgr() = default;
@@ -62,11 +88,20 @@ public:
 
     void get_wg_resource_usage(vectorized::Block* block);
 
+    void add_paused_query(const std::shared_ptr<QueryContext>& query_ctx);
+
+    void handle_paused_queries();
+
 private:
     std::shared_mutex _group_mutex;
     std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups;
 
     std::shared_mutex _clear_cgroup_lock;
+
+    // Save per group paused query list, it should be a global structure, not 
per
+    // workload group, because we need do some coordinate work globally.
+    std::mutex _paused_queries_lock;
+    std::map<WorkloadGroupPtr, std::set<PausedQuery>> _paused_queries_list;
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to