github-actions[bot] commented on code in PR #33015: URL: https://github.com/apache/doris/pull/33015#discussion_r1547111706
########## be/src/runtime/query_context.cpp: ########## @@ -293,4 +310,113 @@ return Status::OK(); } +void QueryContext::add_pipeline_profile_x(int f_id, std::shared_ptr<TRuntimeProfileTree> profile) { + std::lock_guard<std::mutex> l(_profile_mutex); + LOG_INFO("Query X {} add pipeline profile, fid {}", print_id(this->_query_id), f_id); + DCHECK(_profile_map_x != nullptr && profile != nullptr) << fmt::format( + "Add pipeline profile failed, query {}, fragment {}", print_id(this->_query_id), f_id); + + if (!_profile_map_x->contains(f_id)) { + LOG_WARNING("Query X {} add pipeline profile, fid {} not found", print_id(this->_query_id), + f_id); + return; + } + + (*_profile_map_x)[f_id].push_back(profile); +} + +void QueryContext::add_fragment_profile_x( + int f_id, const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles) { + LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline profile count {} ", + print_id(this->_query_id), f_id, pipeline_profiles.size()); + + std::lock_guard<std::mutex> l(_profile_mutex); + + DCHECK(_profile_map_x != nullptr) << fmt::format( + "Add pipeline profile failed, query {}, fragment {}", print_id(this->_query_id), f_id); +#ifndef NDEBUG + for (const auto& p : pipeline_profiles) { + DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed, query {}, fragment {}", + print_id(this->_query_id), f_id); + } +#endif + _profile_map_x->insert(std::make_pair(f_id, pipeline_profiles)); +} + +void QueryContext::add_instance_profile(const TUniqueId& i_id, + std::shared_ptr<TRuntimeProfileTree> profile) { + DCHECK(profile != nullptr) << print_id(i_id); + std::lock_guard<std::mutex> lg(_profile_mutex); + DCHECK(_profile_map != nullptr) + << fmt::format("Add pipeline profile failed, query {}, instance {}", + print_id(this->_query_id), print_id(i_id)); + + _profile_map->insert(std::make_pair(i_id, profile)); +} + +void QueryContext::async_report_profile() { + _async_report_profile_x(); + _async_report_profile_p(); +} + +void QueryContext::_async_report_profile_p() { Review Comment: warning: method '_async_report_profile_p' can be made const [readability-make-member-function-const] ```suggestion void QueryContext::_async_report_profile_p() const { ``` be/src/runtime/query_context.h:367: ```diff - void _async_report_profile_p(); + void _async_report_profile_p() const; ``` ########## be/src/runtime/query_context.cpp: ########## @@ -293,4 +310,113 @@ return Status::OK(); } +void QueryContext::add_pipeline_profile_x(int f_id, std::shared_ptr<TRuntimeProfileTree> profile) { + std::lock_guard<std::mutex> l(_profile_mutex); + LOG_INFO("Query X {} add pipeline profile, fid {}", print_id(this->_query_id), f_id); + DCHECK(_profile_map_x != nullptr && profile != nullptr) << fmt::format( + "Add pipeline profile failed, query {}, fragment {}", print_id(this->_query_id), f_id); + + if (!_profile_map_x->contains(f_id)) { + LOG_WARNING("Query X {} add pipeline profile, fid {} not found", print_id(this->_query_id), + f_id); + return; + } + + (*_profile_map_x)[f_id].push_back(profile); +} + +void QueryContext::add_fragment_profile_x( + int f_id, const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles) { + LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline profile count {} ", + print_id(this->_query_id), f_id, pipeline_profiles.size()); + + std::lock_guard<std::mutex> l(_profile_mutex); + + DCHECK(_profile_map_x != nullptr) << fmt::format( + "Add pipeline profile failed, query {}, fragment {}", print_id(this->_query_id), f_id); +#ifndef NDEBUG + for (const auto& p : pipeline_profiles) { + DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed, query {}, fragment {}", + print_id(this->_query_id), f_id); + } +#endif + _profile_map_x->insert(std::make_pair(f_id, pipeline_profiles)); +} + +void QueryContext::add_instance_profile(const TUniqueId& i_id, + std::shared_ptr<TRuntimeProfileTree> profile) { + DCHECK(profile != nullptr) << print_id(i_id); + std::lock_guard<std::mutex> lg(_profile_mutex); + DCHECK(_profile_map != nullptr) + << fmt::format("Add pipeline profile failed, query {}, instance {}", + print_id(this->_query_id), print_id(i_id)); + + _profile_map->insert(std::make_pair(i_id, profile)); +} + +void QueryContext::async_report_profile() { + _async_report_profile_x(); + _async_report_profile_p(); +} + +void QueryContext::_async_report_profile_p() { + if (enable_pipeline_x_exec()) { + return; + } + + std::lock_guard<std::mutex> lg(_profile_mutex); + LOG_INFO("Query {}, register query profile, instance profile count {}", print_id(_query_id), + _profile_map->size()); + + for (auto& [i_id, i_profile] : *_profile_map) { + ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_instance_profile( + _query_id, this->coord_addr, i_id, i_profile); + } + + _profile_map->clear(); +} + +void QueryContext::_async_report_profile_x() { Review Comment: warning: method '_async_report_profile_x' can be made const [readability-make-member-function-const] ```suggestion void QueryContext::_async_report_profile_x() const { ``` be/src/runtime/query_context.h:368: ```diff - void _async_report_profile_x(); + void _async_report_profile_x() const; ``` ########## be/src/runtime/runtime_query_statistics_mgr.h: ########## @@ -75,9 +82,65 @@ class RuntimeQueryStatiticsMgr { // used for backend_active_tasks void get_active_be_tasks_block(vectorized::Block* block); + void start_report_thread(); + void report_query_profiles_thread(); + void force_report_profile(); + void stop_report_thread(); + + void submit_report_status_task(const TUniqueId& q_id, const TNetworkAddress& coor_addr, + int32 f_id, const TUniqueId& i_id, int be_num, bool done, + Status exec_status); + + void register_instance_profile(const TUniqueId& query_id, const TNetworkAddress& coor_addr, + const TUniqueId& instance_id, + std::shared_ptr<TRuntimeProfileTree> i_profile); + + void register_fragment_profile_x(const TUniqueId& query_id, const TNetworkAddress& const_addr, + int32_t fragment_id, + std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles); + using CoorAddrWithFragmentProfileMap = + std::tuple<TNetworkAddress, + std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>>; + + using CoorAddrWithInstanceProfileMap = + std::tuple<TNetworkAddress, + std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>>; + + static TReportExecStatusParams create_report_exec_status_params( + const TUniqueId& q_id, int32 f_id, + const std::vector<std::shared_ptr<TRuntimeProfileTree>>& f_profile); + + static void report_query_profile( + const TUniqueId&, const TNetworkAddress&, Review Comment: warning: parameter 3 is const-qualified in the function declaration; const-qualification of parameters only has an effect in function definitions [readability-avoid-const-params-in-decls] ```suggestion std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>&); ``` ########## be/src/runtime/query_context.cpp: ########## @@ -293,4 +310,113 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& tg) { return Status::OK(); } +void QueryContext::add_pipeline_profile_x(int f_id, std::shared_ptr<TRuntimeProfileTree> profile) { Review Comment: warning: method 'add_pipeline_profile_x' can be made static [readability-convert-member-functions-to-static] be/src/runtime/query_context.h:373: ```diff - void add_pipeline_profile_x(int f_id, std::shared_ptr<TRuntimeProfileTree> profile); + static void add_pipeline_profile_x(int f_id, std::shared_ptr<TRuntimeProfileTree> profile); ``` ########## be/src/pipeline/pipeline_fragment_context.cpp: ########## @@ -993,4 +995,50 @@ std::string PipelineFragmentContext::debug_string() { return fmt::to_string(debug_string_buffer); } +std::vector<std::shared_ptr<TRuntimeProfileTree>> PipelineFragmentContext::collect_profile_x() + const { + std::vector<std::shared_ptr<TRuntimeProfileTree>> res; + DCHECK(_query_ctx->enable_pipeline_x_exec() == true) + << fmt::format("Query {} calling a pipeline X function, but its pipeline X is disabled", + print_id(this->_query_id)); + std::stringstream ss; + + for (auto& pipeline_profile : _runtime_state->pipeline_id_to_profile()) { + auto profile_ptr = std::make_shared<TRuntimeProfileTree>(); + pipeline_profile->to_thrift(&(*profile_ptr)); + res.push_back(profile_ptr); + + std::vector<RuntimeProfile*> task_x_profile; + pipeline_profile->get_all_children(&task_x_profile); + for (RuntimeProfile* p : task_x_profile) { + if (p->name().find("PipelineXTask") != std::string::npos) { + ss << p->name() << '\n'; + } else { + ss << '\t' << p->name() << '\n'; + } + } + } + + LOG_INFO("Query X {} fragment {} profile\n{} ", print_id(this->_query_id), this->_fragment_id, + ss.str()); + return res; +} + +std::shared_ptr<TRuntimeProfileTree> PipelineFragmentContext::collect_profile_p() const { + auto res = std::make_shared<TRuntimeProfileTree>(); Review Comment: warning: method 'collect_profile_p' can be made static [readability-convert-member-functions-to-static] be/src/pipeline/pipeline_fragment_context.h:149: ```diff - std::shared_ptr<TRuntimeProfileTree> collect_profile_p() const; + static std::shared_ptr<TRuntimeProfileTree> collect_profile_p() ; ``` ```suggestion t { ``` ########## be/src/service/backend_service.cpp: ########## @@ -1129,4 +1133,51 @@ response.__set_status(Status::NotSupported("warm_up_tablets is not implemented").to_thrift()); } +void BaseBackendService::get_realtime_query_exec_status( + TGetRealTimeQueryExecStatusResponse& _return, + const TGetRealTimeQueryExecStatusRequest& request) { + _return = TGetRealTimeQueryExecStatusResponse(); + + if (!request.__isset.query_id) { + LOG_WARNING("Query_id is empty"); + _return.__set_status(Status::InvalidArgument("query_id is empty").to_thrift()); + return; + } + Review Comment: warning: 'auto fragment_mgr' can be declared as 'auto *fragment_mgr' [readability-qualified-auto] ```suggestion auto *fragment_mgr = ExecEnv::GetInstance()->fragment_mgr(); ``` ########## be/src/service/backend_service.cpp: ########## @@ -1129,4 +1133,51 @@ void BaseBackendService::warm_up_tablets(TWarmUpTabletsResponse& response, response.__set_status(Status::NotSupported("warm_up_tablets is not implemented").to_thrift()); } +void BaseBackendService::get_realtime_query_exec_status( Review Comment: warning: method 'get_realtime_query_exec_status' can be made static [readability-convert-member-functions-to-static] ```suggestion static void BaseBackendService::get_realtime_query_exec_status(TGetRealTimeQueryExecStatusResponse& _return, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org