github-actions[bot] commented on code in PR #33331: URL: https://github.com/apache/doris/pull/33331#discussion_r1554942198
########## be/src/runtime/query_context.cpp: ########## @@ -297,4 +317,147 @@ return Status::OK(); } +void QueryContext::add_fragment_profile_x( + int fragment_id, const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles, + std::shared_ptr<TRuntimeProfileTree> load_channel_profile) { +#ifndef NDEBUG + for (const auto& p : pipeline_profiles) { + DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed, query {}, fragment {}", + print_id(this->_query_id), fragment_id); + } +#endif + + std::lock_guard<std::mutex> l(_profile_mutex); + LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline profile count {} ", + print_id(this->_query_id), fragment_id, pipeline_profiles.size()); + + _profile_map_x.insert(std::make_pair(fragment_id, pipeline_profiles)); + + if (load_channel_profile != nullptr) { + _load_channel_profile_map_x.insert(std::make_pair(fragment_id, load_channel_profile)); + } +} + +void QueryContext::add_instance_profile(const TUniqueId& instance_id, + std::shared_ptr<TRuntimeProfileTree> profile, + std::shared_ptr<TRuntimeProfileTree> load_channel_profile) { + DCHECK(profile != nullptr) << print_id(instance_id); + + std::lock_guard<std::mutex> lg(_profile_mutex); + _profile_map.insert(std::make_pair(instance_id, profile)); + if (load_channel_profile != nullptr) { + _load_channel_profile_map.insert(std::make_pair(instance_id, load_channel_profile)); + } +} + +void QueryContext::_report_query_profile() { + if (!enable_profile()) { + return; + } + + _report_query_profile_x(); + _report_query_profile_non_pipeline(); +} + +void QueryContext::_report_query_profile_non_pipeline() { + if (enable_pipeline_exec()) { + return; + } + + std::lock_guard<std::mutex> lg(_profile_mutex); + LOG_INFO("Query {}, register query profile, instance profile count {}", print_id(_query_id), + _profile_map.size()); + + for (auto& [instance_id, instance_profile] : _profile_map) { + std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr; + if (_load_channel_profile_map.contains(instance_id)) { + load_channel_profile = _load_channel_profile_map[instance_id]; + } + + ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_instance_profile( + _query_id, this->coord_addr, instance_id, instance_profile, load_channel_profile); + } + + ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile(); +} + +void QueryContext::_report_query_profile_x() { Review Comment: warning: method '_report_query_profile_x' can be made const [readability-make-member-function-const] ```suggestion void QueryContext::_report_query_profile_x() const { ``` be/src/runtime/query_context.h:387: ```diff - void _report_query_profile_x(); + void _report_query_profile_x() const; ``` ########## be/src/runtime/runtime_query_statistics_mgr.cpp: ########## @@ -17,14 +17,428 @@ #include "runtime/runtime_query_statistics_mgr.h" +#include <gen_cpp/FrontendService_types.h> +#include <gen_cpp/RuntimeProfile_types.h> +#include <gen_cpp/Status_types.h> +#include <gen_cpp/Types_types.h> +#include <thrift/TApplicationException.h> + +#include <cstdint> +#include <memory> +#include <mutex> +#include <random> +#include <shared_mutex> +#include <string> +#include <tuple> +#include <unordered_map> +#include <vector> + +#include "common/logging.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" +#include "runtime/query_context.h" +#include "service/backend_options.h" #include "util/debug_util.h" #include "util/time.h" +#include "util/uid_util.h" #include "vec/core/block.h" namespace doris { +static Status _do_report_exec_stats_rpc(const TNetworkAddress& coor_addr, + const TReportExecStatusParams& req, + TReportExecStatusResult& res) { + Status client_status; + FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr, + &client_status); + if (!client_status.ok()) { + LOG_WARNING( + "could not get client rpc client of {} when reporting profiles, reason is {}, " + "not reporting, profile will be lost", + PrintThriftNetworkAddress(coor_addr), client_status.to_string()); + return Status::RpcError("Client rpc client failed"); + } + + try { + try { + rpc_client->reportExecStatus(res, req); + } catch (const apache::thrift::transport::TTransportException& e) { + LOG_WARNING("Transport exception from {}, reason: {}, reopening", + PrintThriftNetworkAddress(coor_addr), e.what()); + client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms); + if (!client_status.ok()) { + LOG_WARNING("Reopen failed, reason: {}", client_status.to_string()); + return Status::RpcError("Open rpc client failed"); + } + + rpc_client->reportExecStatus(res, req); + } + } catch (apache::thrift::TApplicationException& e) { + if (e.getType() == e.UNKNOWN_METHOD) { + LOG_WARNING( + "Failed to send statistics to {} due to {}, usually because the frontend " + "is not upgraded, check the version", + PrintThriftNetworkAddress(coor_addr), e.what()); + } else { + LOG_WARNING( + "Failed to send statistics to {}, reason: {}, you can see fe log for " + "details.", + PrintThriftNetworkAddress(coor_addr), e.what()); + } + return Status::RpcError("Send stats failed"); + } catch (std::exception& e) { + LOG_WARNING("Failed to send statistics to {}, reason: {}, you can see fe log for details.", + PrintThriftNetworkAddress(coor_addr), e.what()); + return Status::RpcError("Send stats failed"); + } + + return Status::OK(); +} + +TReportExecStatusParams RuntimeQueryStatiticsMgr::create_report_exec_status_params_x( + const TUniqueId& query_id, + const std::unordered_map<int32, std::vector<std::shared_ptr<TRuntimeProfileTree>>>& + fragment_id_to_profile, + const std::vector<std::shared_ptr<TRuntimeProfileTree>>& load_channel_profiles) { + TQueryProfile profile; + profile.__set_query_id(query_id); + + std::map<int32_t, std::vector<TDetailedReportParams>> fragment_id_to_profile_req; + + for (const auto& entry : fragment_id_to_profile) { + int32_t fragment_id = entry.first; + const std::vector<std::shared_ptr<TRuntimeProfileTree>>& fragment_profile = entry.second; + std::vector<TDetailedReportParams> detailed_params; + + for (auto pipeline_profile : fragment_profile) { + if (pipeline_profile == nullptr) { + auto msg = fmt::format("Register fragment profile {} {} failed, profile is null", + print_id(query_id), fragment_id); + DCHECK(false) << msg; + LOG_ERROR(msg); + continue; + } + + TDetailedReportParams tmp; + tmp.__set_profile(*pipeline_profile); + // tmp.fragment_instance_id is not needed for pipeline x + detailed_params.push_back(tmp); + } + + fragment_id_to_profile_req.insert(std::make_pair(fragment_id, detailed_params)); + } + + if (fragment_id_to_profile_req.size() == 0) { + LOG_WARNING("No fragment profile found for query {}", print_id(query_id)); + } + + profile.__set_fragment_id_to_profile(fragment_id_to_profile_req); + + std::vector<TRuntimeProfileTree> load_channel_profiles_req; + for (auto load_channel_profile : load_channel_profiles) { + if (load_channel_profile == nullptr) { + auto msg = fmt::format( + "Register fragment profile {} {} failed, load channel profile is null", + print_id(query_id), -1); + DCHECK(false) << msg; + LOG_ERROR(msg); + continue; + } + + load_channel_profiles_req.push_back(*load_channel_profile); + } + + if (load_channel_profiles_req.size() > 0) { + profile.__set_load_channel_profiles(load_channel_profiles_req); + } + + TReportExecStatusParams req; + req.__set_query_profile(profile); + req.__set_query_id(TUniqueId()); + + return req; +} + +TReportExecStatusParams RuntimeQueryStatiticsMgr::create_report_exec_status_params_non_pipeline( + const TUniqueId& query_id, + const std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>& + instance_id_to_profile, + const std::vector<std::shared_ptr<TRuntimeProfileTree>>& load_channel_profile) { + TQueryProfile profile; + std::vector<TUniqueId> fragment_instance_ids; + std::vector<TRuntimeProfileTree> instance_profiles; + std::vector<TRuntimeProfileTree> load_channel_profiles; + + for (auto entry : instance_id_to_profile) { + TUniqueId instance_id = entry.first; + std::shared_ptr<TRuntimeProfileTree> profile = entry.second; + + if (profile == nullptr) { + auto msg = fmt::format("Register instance profile {} {} failed, profile is null", + print_id(query_id), print_id(instance_id)); + DCHECK(false) << msg; + LOG_ERROR(msg); + continue; + } + + fragment_instance_ids.push_back(instance_id); + instance_profiles.push_back(*profile); + } + + profile.__set_query_id(query_id); + profile.__set_fragment_instance_ids(fragment_instance_ids); + profile.__set_instance_profiles(instance_profiles); + profile.__set_load_channel_profiles(load_channel_profiles); + + TReportExecStatusParams res; + res.__set_query_profile(profile); + res.__set_query_id(TUniqueId()); + return res; +} + +void RuntimeQueryStatiticsMgr::start_report_thread() { + if (started.load()) { + DCHECK(false) << "report thread has been started"; + LOG_ERROR("report thread has been started"); + return; + } + + started.store(true); + + for (size_t i = 0; i < config::report_exec_status_thread_num; ++i) { + this->_report_profile_threads.emplace_back(std::make_unique<std::thread>( + &RuntimeQueryStatiticsMgr::report_query_profiles_thread, this)); + } +} + +void RuntimeQueryStatiticsMgr::report_query_profiles_thread() { + while (true) { + { + std::unique_lock<std::mutex> lock(_report_profile_mutex); + + while (_query_profile_map.empty() && _profile_map_x.empty() && + !_report_profile_thread_stop) { + _report_profile_cv.wait(lock); + } + } + + _report_query_profiles_function(); + + { + std::lock_guard<std::mutex> lg(_report_profile_mutex); + + if (_report_profile_thread_stop) { + LOG_INFO("Report profile thread stopped"); + return; + } + } + } +} + +void RuntimeQueryStatiticsMgr::trigger_report_profile() { Review Comment: warning: method 'trigger_report_profile' can be made static [readability-convert-member-functions-to-static] be/src/runtime/runtime_query_statistics_mgr.h:99: ```diff - void trigger_report_profile(); + static void trigger_report_profile(); ``` ########## be/src/runtime/query_context.h: ########## @@ -351,6 +356,59 @@ class QueryContext { std::mutex _weighted_mem_lock; int64_t _weighted_consumption = 0; int64_t _weighted_limit = 0; + + std::mutex _profile_mutex; + + // when fragment of pipeline x is closed, it will register its profile to this map by using add_fragment_profile_x + // flatten profile of one fragment: + // Pipeline 0 + // PipelineTask 0 + // Operator 1 + // Operator 2 + // Scanner + // PipelineTask 1 + // Operator 1 + // Operator 2 + // Scanner + // Pipeline 1 + // PipelineTask 2 + // Operator 3 + // PipelineTask 3 + // Operator 3 + // fragment_id -> list<profile> + std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> _profile_map_x; + std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>> _load_channel_profile_map_x; + + // instance_id -> profile + std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>> _profile_map; + std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>> _load_channel_profile_map; + + void _report_query_profile(); + void _report_query_profile_non_pipeline(); + void _report_query_profile_x(); + + std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> + _collect_realtime_query_profile_x() const; + + std::unordered_map<TUniqueId, std::vector<std::shared_ptr<TRuntimeProfileTree>>> + _collect_realtime_query_profile_non_pipeline() const; + +public: + // when fragment of pipeline x is closed, it will register its profile to this map by using add_fragment_profile_x + void add_fragment_profile_x( + int fragment_id, + const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profile, + std::shared_ptr<TRuntimeProfileTree> load_channel_profile); + + void add_instance_profile(const TUniqueId& iid, std::shared_ptr<TRuntimeProfileTree> profile, + std::shared_ptr<TRuntimeProfileTree> load_channel_profile); + + TReportExecStatusParams get_realtime_exec_status_x() const; + + bool enable_profile() const { Review Comment: warning: method 'enable_profile' can be made static [readability-convert-member-functions-to-static] ```suggestion static bool enable_profile() { ``` ########## be/src/runtime/query_context.cpp: ########## @@ -297,4 +317,147 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& tg) { return Status::OK(); } +void QueryContext::add_fragment_profile_x( + int fragment_id, const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles, + std::shared_ptr<TRuntimeProfileTree> load_channel_profile) { +#ifndef NDEBUG + for (const auto& p : pipeline_profiles) { + DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed, query {}, fragment {}", + print_id(this->_query_id), fragment_id); + } +#endif + + std::lock_guard<std::mutex> l(_profile_mutex); + LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline profile count {} ", + print_id(this->_query_id), fragment_id, pipeline_profiles.size()); + + _profile_map_x.insert(std::make_pair(fragment_id, pipeline_profiles)); + + if (load_channel_profile != nullptr) { + _load_channel_profile_map_x.insert(std::make_pair(fragment_id, load_channel_profile)); + } +} + +void QueryContext::add_instance_profile(const TUniqueId& instance_id, + std::shared_ptr<TRuntimeProfileTree> profile, + std::shared_ptr<TRuntimeProfileTree> load_channel_profile) { + DCHECK(profile != nullptr) << print_id(instance_id); + + std::lock_guard<std::mutex> lg(_profile_mutex); + _profile_map.insert(std::make_pair(instance_id, profile)); + if (load_channel_profile != nullptr) { + _load_channel_profile_map.insert(std::make_pair(instance_id, load_channel_profile)); + } +} + +void QueryContext::_report_query_profile() { + if (!enable_profile()) { + return; + } + + _report_query_profile_x(); + _report_query_profile_non_pipeline(); +} + +void QueryContext::_report_query_profile_non_pipeline() { Review Comment: warning: method '_report_query_profile_non_pipeline' can be made const [readability-make-member-function-const] ```suggestion void QueryContext::_report_query_profile_non_pipeline() const { ``` be/src/runtime/query_context.h:386: ```diff - void _report_query_profile_non_pipeline(); + void _report_query_profile_non_pipeline() const; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org