This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 972c8dfe5ae [Refactor]refactor log workload group when query arrives 
be (#45034)
972c8dfe5ae is described below

commit 972c8dfe5ae48bfbe29b9fd0b6ed163a87778fd2
Author: wangbo <wan...@selectdb.com>
AuthorDate: Tue Dec 10 15:24:39 2024 +0800

    [Refactor]refactor log workload group when query arrives be (#45034)
---
 be/src/runtime/fragment_mgr.cpp                    | 33 +++++++++++++---------
 be/src/runtime/load_channel.cpp                    |  2 +-
 be/src/runtime/query_context.cpp                   |  3 +-
 be/src/runtime/query_context.h                     |  2 +-
 .../workload_group/workload_group_manager.cpp      |  6 ++--
 .../workload_group/workload_group_manager.h        |  2 +-
 6 files changed, 26 insertions(+), 22 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index b1bc42491b5..ce18071fda0 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -648,12 +648,25 @@ Status FragmentMgr::_get_or_create_query_ctx(const 
TPipelineFragmentParams& para
             }
 
             if (!query_ctx) {
+                WorkloadGroupPtr workload_group_ptr = nullptr;
+                std::string wg_info_str = "Workload Group not set";
+                if (params.__isset.workload_groups && 
!params.workload_groups.empty()) {
+                    uint64_t wg_id = params.workload_groups[0].id;
+                    workload_group_ptr = 
_exec_env->workload_group_mgr()->get_group(wg_id);
+                    if (workload_group_ptr != nullptr) {
+                        wg_info_str = workload_group_ptr->debug_string();
+                    } else {
+                        wg_info_str = "set wg but not find it in be";
+                    }
+                }
+
                 // First time a fragment of a query arrived. print logs.
                 LOG(INFO) << "query_id: " << print_id(query_id) << ", 
coord_addr: " << params.coord
                           << ", total fragment num on current host: " << 
params.fragment_num_on_host
                           << ", fe process uuid: " << 
params.query_options.fe_process_uuid
                           << ", query type: " << 
params.query_options.query_type
-                          << ", report audit fe:" << params.current_connect_fe;
+                          << ", report audit fe:" << params.current_connect_fe
+                          << ", use wg:" << wg_info_str;
 
                 // This may be a first fragment request of the query.
                 // Create the query fragments context.
@@ -678,19 +691,11 @@ Status FragmentMgr::_get_or_create_query_ctx(const 
TPipelineFragmentParams& para
 
                 _set_scan_concurrency(params, query_ctx.get());
 
-                if (params.__isset.workload_groups && 
!params.workload_groups.empty()) {
-                    uint64_t tg_id = params.workload_groups[0].id;
-                    WorkloadGroupPtr workload_group_ptr =
-                            
_exec_env->workload_group_mgr()->get_task_group_by_id(tg_id);
-                    if (workload_group_ptr != nullptr) {
-                        
RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx));
-                        
RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
-                        
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
-                                print_id(query_id), tg_id);
-                    } else {
-                        LOG(WARNING) << "Query/load id: " << 
print_id(query_ctx->query_id())
-                                     << "can't find its workload group " << 
tg_id;
-                    }
+                if (workload_group_ptr != nullptr) {
+                    RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, 
query_ctx));
+                    query_ctx->set_workload_group(workload_group_ptr);
+                    
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
+                            print_id(query_id), workload_group_ptr->id());
                 }
                 // There is some logic in query ctx's dctor, we could not 
check if exists and delete the
                 // temp query ctx now. For example, the query id maybe removed 
from workload group's queryset.
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index dd426f1ab81..6dfd5d46eb6 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -59,7 +59,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t 
timeout_s, bool is_hig
                 fmt::format("(FromLoadChannel)Load#Id={}", 
_load_id.to_string()));
         if (wg_id > 0) {
             WorkloadGroupPtr workload_group_ptr =
-                    
ExecEnv::GetInstance()->workload_group_mgr()->get_task_group_by_id(wg_id);
+                    
ExecEnv::GetInstance()->workload_group_mgr()->get_group(wg_id);
             if (workload_group_ptr) {
                 wg_ptr = workload_group_ptr;
                 wg_ptr->add_mem_tracker_limiter(mem_tracker);
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 161964420e9..c777c8100ef 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -323,14 +323,13 @@ ThreadPool* QueryContext::get_memtable_flush_pool() {
     }
 }
 
-Status QueryContext::set_workload_group(WorkloadGroupPtr& tg) {
+void QueryContext::set_workload_group(WorkloadGroupPtr& tg) {
     _workload_group = tg;
     // Should add query first, then the workload group will not be deleted.
     // see task_group_manager::delete_workload_group_by_ids
     _workload_group->add_mem_tracker_limiter(query_mem_tracker);
     _workload_group->get_query_scheduler(&_task_scheduler, 
&_scan_task_scheduler,
                                          &_memtable_flush_pool, 
&_remote_scan_task_scheduler);
-    return Status::OK();
 }
 
 void QueryContext::add_fragment_profile(
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index d557245bf23..35431d54394 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -138,7 +138,7 @@ public:
         }
     }
 
-    Status set_workload_group(WorkloadGroupPtr& tg);
+    void set_workload_group(WorkloadGroupPtr& tg);
 
     int execution_timeout() const {
         return _query_options.__isset.execution_timeout ? 
_query_options.execution_timeout
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 4d32fc8700e..e2c4b8c7f5e 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -86,10 +86,10 @@ void WorkloadGroupMgr::get_related_workload_groups(
     }
 }
 
-WorkloadGroupPtr WorkloadGroupMgr::get_task_group_by_id(uint64_t tg_id) {
+WorkloadGroupPtr WorkloadGroupMgr::get_group(uint64_t wg_id) {
     std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
-    if (_workload_groups.find(tg_id) != _workload_groups.end()) {
-        return _workload_groups.at(tg_id);
+    if (_workload_groups.find(wg_id) != _workload_groups.end()) {
+        return _workload_groups.at(wg_id);
     }
     return nullptr;
 }
diff --git a/be/src/runtime/workload_group/workload_group_manager.h 
b/be/src/runtime/workload_group/workload_group_manager.h
index 18a0687b373..c0eb0dfc0b2 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -55,7 +55,7 @@ public:
 
     void delete_workload_group_by_ids(std::set<uint64_t> id_set);
 
-    WorkloadGroupPtr get_task_group_by_id(uint64_t tg_id);
+    WorkloadGroupPtr get_group(uint64_t wg_id);
 
     void do_sweep();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to