github-actions[bot] commented on code in PR #33015: URL: https://github.com/apache/doris/pull/33015#discussion_r1544120481
########## be/src/pipeline/pipeline_fragment_context.cpp: ########## @@ -981,4 +992,50 @@ std::string PipelineFragmentContext::debug_string() { return fmt::to_string(debug_string_buffer); } +std::vector<profile::TRuntimeProfilePtr> PipelineFragmentContext::collect_profile_x() const { + std::vector<profile::TRuntimeProfilePtr> res; + DCHECK(_query_ctx->enable_pipeline_x_exec() == true) + << fmt::format("Query {} calling a pipeline X function, but its pipeline X is disabled", + print_id(this->_query_id)); + + std::stringstream ss; + + for (auto& pipeline_profile : _runtime_state->pipeline_id_to_profile()) { + auto profile_ptr = std::make_shared<TRuntimeProfileTree>(); + pipeline_profile->to_thrift(&(*profile_ptr)); + res.push_back(profile_ptr); + + std::vector<RuntimeProfile*> task_x_profile; + pipeline_profile->get_all_children(&task_x_profile); + for (RuntimeProfile* p : task_x_profile) { + if (p->name().find("PipelineXTask") != std::string::npos) { + ss << p->name() << '\n'; + } else { + ss << '\t' << p->name() << '\n'; + } + } + } + + LOG_INFO("Query X {} fragment {} profile\n{} ", print_id(this->_query_id), this->_fragment_id, + ss.str()); + return res; +} + +profile::TRuntimeProfilePtr PipelineFragmentContext::collect_profile() const { + profile::TRuntimeProfilePtr res = std::make_shared<TRuntimeProfileTree>(); Review Comment: warning: method 'collect_profile' can be made static [readability-convert-member-functions-to-static] be/src/pipeline/pipeline_fragment_context.h:150: ```diff - profile::TRuntimeProfilePtr collect_profile() const; + static profile::TRuntimeProfilePtr collect_profile() ; ``` ```suggestion t { ``` ########## be/src/runtime/fragment_mgr.cpp: ########## @@ -193,7 +193,7 @@ Status FragmentMgr::trigger_pipeline_context_report( // it is only invoked from the executor's reporting thread. // Also, the reported status will always reflect the most recent execution status, // including the final status when execution finishes. -void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { +void FragmentMgr::report_status_callback(const ReportStatusRequest& req) { Review Comment: warning: function 'report_status_callback' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp void FragmentMgr::report_status_callback(const ReportStatusRequest& req) { ^ ``` <details> <summary>Additional context</summary> **be/src/runtime/fragment_mgr.cpp:195:** 264 lines including whitespace and comments (threshold 80) ```cpp void FragmentMgr::report_status_callback(const ReportStatusRequest& req) { ^ ``` </details> ########## be/src/runtime/fragment_mgr.cpp: ########## @@ -1546,4 +1577,17 @@ } } +void FragmentMgr::async_get_query_statics(const TUniqueId& query_id) { + std::lock_guard<std::mutex> lock(_lock); + + auto iter = _query_ctx_map.find(query_id); + + if (iter == _query_ctx_map.end()) { + throw doris::Exception(ErrorCode::NOT_FOUND, "query id not found"); + } + + iter->second->async_report_profile(); + return; +} Review Comment: warning: redundant return statement at the end of a function with a void return type [readability-redundant-control-flow] ```suggestion } ``` ########## be/src/runtime/fragment_mgr.cpp: ########## @@ -1546,4 +1577,17 @@ } } +void FragmentMgr::async_get_query_statics(const TUniqueId& query_id) { Review Comment: warning: method 'async_get_query_statics' can be made static [readability-convert-member-functions-to-static] ```suggestion static void FragmentMgr::async_get_query_statics(const TUniqueId& query_id) { ``` ########## be/src/pipeline/pipeline_fragment_context.h: ########## @@ -17,21 +17,25 @@ #pragma once +#include <gen_cpp/RuntimeProfile_types.h> Review Comment: warning: 'gen_cpp/RuntimeProfile_types.h' file not found [clang-diagnostic-error] ```cpp #include <gen_cpp/RuntimeProfile_types.h> ^ ``` ########## be/src/runtime/profile/profile.h: ########## @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/RuntimeProfile_types.h> Review Comment: warning: 'gen_cpp/RuntimeProfile_types.h' file not found [clang-diagnostic-error] ```cpp #include <gen_cpp/RuntimeProfile_types.h> ^ ``` ########## be/src/runtime/query_context.h: ########## @@ -18,15 +18,18 @@ #pragma once #include <gen_cpp/PaloInternalService_types.h> Review Comment: warning: 'gen_cpp/PaloInternalService_types.h' file not found [clang-diagnostic-error] ```cpp #include <gen_cpp/PaloInternalService_types.h> ^ ``` ########## be/src/runtime/query_context.cpp: ########## @@ -281,4 +293,77 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& tg) { return Status::OK(); } +void QueryContext::async_report_profile_x() { Review Comment: warning: method 'async_report_profile_x' can be made const [readability-make-member-function-const] ```suggestion void QueryContext::async_report_profile_x() const { ``` be/src/runtime/query_context.h:337: ```diff - void async_report_profile_x(); + void async_report_profile_x() const; ``` ########## be/src/service/backend_service.cpp: ########## @@ -1125,4 +1128,39 @@ void BaseBackendService::warm_up_tablets(TWarmUpTabletsResponse& response, response.__set_status(Status::NotSupported("warm_up_tablets is not implemented").to_thrift()); } +void BaseBackendService::async_get_query_statics(TAsyncGetQueryStaticsResponse& _return, Review Comment: warning: method 'async_get_query_statics' can be made static [readability-convert-member-functions-to-static] ```suggestion static void BaseBackendService::async_get_query_statics(TAsyncGetQueryStaticsResponse& _return, ``` ########## be/src/runtime/runtime_query_statistics_mgr.h: ########## @@ -18,10 +18,15 @@ #pragma once #include <gen_cpp/Data_types.h> Review Comment: warning: 'gen_cpp/Data_types.h' file not found [clang-diagnostic-error] ```cpp #include <gen_cpp/Data_types.h> ^ ``` ########## be/src/service/backend_service.cpp: ########## @@ -1125,4 +1128,39 @@ response.__set_status(Status::NotSupported("warm_up_tablets is not implemented").to_thrift()); } +void BaseBackendService::async_get_query_statics(TAsyncGetQueryStaticsResponse& _return, + const TAsyncGetQueryStaticsRequest& request) { + _return = TAsyncGetQueryStaticsResponse(); + + if (!request.__isset.query_id) { + LOG_WARNING("Query_id is empty"); + _return.__set_status(Status::InvalidArgument("query_id is empty").to_thrift()); + return; + } + + auto fragment_mgr = ExecEnv::GetInstance()->fragment_mgr(); + if (fragment_mgr == nullptr) { + LOG_ERROR("Fragment manager is has not been created"); + _return.__set_status( + Status::InternalError("Fragment manager has not been created").to_thrift()); + return; + } + + try { + fragment_mgr->async_get_query_statics(request.query_id); + + ExecEnv::GetInstance()->runtime_query_statistics_mgr()->force_report_profile(); + + LOG_INFO("Async get {} query statics finished", print_id(request.query_id)); + } catch (const doris::Exception& e) { + LOG_WARNING("Failed to async get {} query statics. error: {}", e.what()); + _return.__set_status( + Status::NotFound("Query {} not found", print_id(request.query_id)).to_thrift()); + return; + } + + _return.__set_status(Status::OK().to_thrift()); + return; +} Review Comment: warning: redundant return statement at the end of a function with a void return type [readability-redundant-control-flow] ```suggestion } ``` ########## be/src/runtime/query_context.cpp: ########## @@ -281,4 +293,77 @@ return Status::OK(); } +void QueryContext::async_report_profile_x() { + if (!enable_pipeline_x_exec()) { + return; + } + + std::lock_guard<std::mutex> lg(_profile_mutex); + LOG_INFO( + "Pipeline x query context, register query profile, query {}, fragment profile count {}", + print_id(_query_id), _profile_map_x.size()); + + for (auto& [fid, f_profile] : _profile_map_x) { + auto tmp_f_profile = std::make_shared<profile::FragmentProfileX>(); + + for (auto p_profile : f_profile.second) { + tmp_f_profile->pipeline_profiles.push_back( + std::make_shared<profile::PipelineProfileX>(fid, f_profile.first, p_profile)); + } + + ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile_x( + _query_id, fid, this->coord_addr, tmp_f_profile); + } + + _profile_map_x.clear(); +} + +void QueryContext::add_pipeline_profile_x(int f_id, bool finished, + profile::TRuntimeProfilePtr profile) { + std::lock_guard<std::mutex> l(_profile_mutex); + LOG_INFO("Query X {} add pipeline profile, fid {}", print_id(this->_query_id), f_id); + _profile_map_x[f_id].first = finished; + _profile_map_x[f_id].second.push_back(profile); +} + +void QueryContext::add_fragment_profile_x( + int f_id, bool finished, const std::vector<profile::TRuntimeProfilePtr>& pipeline_profile) { + LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline profile count {} ", + print_id(this->_query_id), f_id, pipeline_profile.size()); + + std::lock_guard<std::mutex> l(_profile_mutex); + _profile_map_x[f_id] = std::make_pair(finished, pipeline_profile); +} + +void QueryContext::add_instance_profile(const TUniqueId& iid, bool finished, + profile::TRuntimeProfilePtr profile) { + // LOG_INFO("Query {} add instance profile, iid {}, finished {}", print_id(this->_query_id), + // print_id(iid), finished); + DCHECK(profile != nullptr) << print_id(iid); + std::lock_guard<std::mutex> lg(_profile_mutex); + _profile_map[print_id(iid)] = + std::make_pair(finished, std::make_shared<profile::InstanceProfile>(iid, profile)); +} + +void QueryContext::async_report_profile() { Review Comment: warning: method 'async_report_profile' can be made const [readability-make-member-function-const] ```suggestion void QueryContext::async_report_profile() const { ``` be/src/runtime/query_context.h:344: ```diff - void async_report_profile(); + void async_report_profile() const; ``` ########## be/src/service/backend_service.cpp: ########## @@ -1125,4 +1128,39 @@ response.__set_status(Status::NotSupported("warm_up_tablets is not implemented").to_thrift()); } +void BaseBackendService::async_get_query_statics(TAsyncGetQueryStaticsResponse& _return, + const TAsyncGetQueryStaticsRequest& request) { + _return = TAsyncGetQueryStaticsResponse(); + + if (!request.__isset.query_id) { + LOG_WARNING("Query_id is empty"); + _return.__set_status(Status::InvalidArgument("query_id is empty").to_thrift()); + return; + } + + auto fragment_mgr = ExecEnv::GetInstance()->fragment_mgr(); Review Comment: warning: 'auto fragment_mgr' can be declared as 'auto *fragment_mgr' [readability-qualified-auto] ```suggestion auto *fragment_mgr = ExecEnv::GetInstance()->fragment_mgr(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org