This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 972c8dfe5ae [Refactor]refactor log workload group when query arrives be (#45034) 972c8dfe5ae is described below commit 972c8dfe5ae48bfbe29b9fd0b6ed163a87778fd2 Author: wangbo <wan...@selectdb.com> AuthorDate: Tue Dec 10 15:24:39 2024 +0800 [Refactor]refactor log workload group when query arrives be (#45034) --- be/src/runtime/fragment_mgr.cpp | 33 +++++++++++++--------- be/src/runtime/load_channel.cpp | 2 +- be/src/runtime/query_context.cpp | 3 +- be/src/runtime/query_context.h | 2 +- .../workload_group/workload_group_manager.cpp | 6 ++-- .../workload_group/workload_group_manager.h | 2 +- 6 files changed, 26 insertions(+), 22 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index b1bc42491b5..ce18071fda0 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -648,12 +648,25 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para } if (!query_ctx) { + WorkloadGroupPtr workload_group_ptr = nullptr; + std::string wg_info_str = "Workload Group not set"; + if (params.__isset.workload_groups && !params.workload_groups.empty()) { + uint64_t wg_id = params.workload_groups[0].id; + workload_group_ptr = _exec_env->workload_group_mgr()->get_group(wg_id); + if (workload_group_ptr != nullptr) { + wg_info_str = workload_group_ptr->debug_string(); + } else { + wg_info_str = "set wg but not find it in be"; + } + } + // First time a fragment of a query arrived. print logs. LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " << params.coord << ", total fragment num on current host: " << params.fragment_num_on_host << ", fe process uuid: " << params.query_options.fe_process_uuid << ", query type: " << params.query_options.query_type - << ", report audit fe:" << params.current_connect_fe; + << ", report audit fe:" << params.current_connect_fe + << ", use wg:" << wg_info_str; // This may be a first fragment request of the query. // Create the query fragments context. @@ -678,19 +691,11 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para _set_scan_concurrency(params, query_ctx.get()); - if (params.__isset.workload_groups && !params.workload_groups.empty()) { - uint64_t tg_id = params.workload_groups[0].id; - WorkloadGroupPtr workload_group_ptr = - _exec_env->workload_group_mgr()->get_task_group_by_id(tg_id); - if (workload_group_ptr != nullptr) { - RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx)); - RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr)); - _exec_env->runtime_query_statistics_mgr()->set_workload_group_id( - print_id(query_id), tg_id); - } else { - LOG(WARNING) << "Query/load id: " << print_id(query_ctx->query_id()) - << "can't find its workload group " << tg_id; - } + if (workload_group_ptr != nullptr) { + RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx)); + query_ctx->set_workload_group(workload_group_ptr); + _exec_env->runtime_query_statistics_mgr()->set_workload_group_id( + print_id(query_id), workload_group_ptr->id()); } // There is some logic in query ctx's dctor, we could not check if exists and delete the // temp query ctx now. For example, the query id maybe removed from workload group's queryset. diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index dd426f1ab81..6dfd5d46eb6 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -59,7 +59,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig fmt::format("(FromLoadChannel)Load#Id={}", _load_id.to_string())); if (wg_id > 0) { WorkloadGroupPtr workload_group_ptr = - ExecEnv::GetInstance()->workload_group_mgr()->get_task_group_by_id(wg_id); + ExecEnv::GetInstance()->workload_group_mgr()->get_group(wg_id); if (workload_group_ptr) { wg_ptr = workload_group_ptr; wg_ptr->add_mem_tracker_limiter(mem_tracker); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 161964420e9..c777c8100ef 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -323,14 +323,13 @@ ThreadPool* QueryContext::get_memtable_flush_pool() { } } -Status QueryContext::set_workload_group(WorkloadGroupPtr& tg) { +void QueryContext::set_workload_group(WorkloadGroupPtr& tg) { _workload_group = tg; // Should add query first, then the workload group will not be deleted. // see task_group_manager::delete_workload_group_by_ids _workload_group->add_mem_tracker_limiter(query_mem_tracker); _workload_group->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler, &_memtable_flush_pool, &_remote_scan_task_scheduler); - return Status::OK(); } void QueryContext::add_fragment_profile( diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index d557245bf23..35431d54394 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -138,7 +138,7 @@ public: } } - Status set_workload_group(WorkloadGroupPtr& tg); + void set_workload_group(WorkloadGroupPtr& tg); int execution_timeout() const { return _query_options.__isset.execution_timeout ? _query_options.execution_timeout diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 4d32fc8700e..e2c4b8c7f5e 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -86,10 +86,10 @@ void WorkloadGroupMgr::get_related_workload_groups( } } -WorkloadGroupPtr WorkloadGroupMgr::get_task_group_by_id(uint64_t tg_id) { +WorkloadGroupPtr WorkloadGroupMgr::get_group(uint64_t wg_id) { std::shared_lock<std::shared_mutex> r_lock(_group_mutex); - if (_workload_groups.find(tg_id) != _workload_groups.end()) { - return _workload_groups.at(tg_id); + if (_workload_groups.find(wg_id) != _workload_groups.end()) { + return _workload_groups.at(wg_id); } return nullptr; } diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index 18a0687b373..c0eb0dfc0b2 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -55,7 +55,7 @@ public: void delete_workload_group_by_ids(std::set<uint64_t> id_set); - WorkloadGroupPtr get_task_group_by_id(uint64_t tg_id); + WorkloadGroupPtr get_group(uint64_t wg_id); void do_sweep(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org