github-actions[bot] commented on code in PR #33015:
URL: https://github.com/apache/doris/pull/33015#discussion_r1544457074


##########
be/src/runtime/query_context.cpp:
##########
@@ -281,4 +295,145 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& 
tg) {
     return Status::OK();
 }
 
+void QueryContext::add_pipeline_profile_x(int f_id, bool finished,
+                                          profile::TRuntimeProfilePtr profile) 
{
+    std::lock_guard<std::mutex> l(_profile_mutex);
+    LOG_INFO("Query X {} add pipeline profile, fid {}", 
print_id(this->_query_id), f_id);
+    _profile_map_x[f_id].first = finished;
+    _profile_map_x[f_id].second.push_back(profile);
+}
+
+void QueryContext::add_fragment_profile_x(
+        int f_id, bool finished, const 
std::vector<profile::TRuntimeProfilePtr>& pipeline_profile) {
+    LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline 
profile count {} ",
+             print_id(this->_query_id), f_id, pipeline_profile.size());
+
+    std::lock_guard<std::mutex> l(_profile_mutex);
+    _profile_map_x[f_id] = std::make_pair(finished, pipeline_profile);
+}
+
+void QueryContext::add_instance_profile(const TUniqueId& iid, bool finished,
+                                        profile::TRuntimeProfilePtr profile) {
+    // LOG_INFO("Query {} add instance profile, iid {}, finished {}", 
print_id(this->_query_id),
+    //          print_id(iid), finished);
+    DCHECK(profile != nullptr) << print_id(iid);
+    std::lock_guard<std::mutex> lg(_profile_mutex);
+    _profile_map[print_id(iid)] =
+            std::make_pair(finished, 
std::make_shared<profile::InstanceProfile>(iid, profile));
+}
+
+void QueryContext::async_report_profile() {
+    _async_report_profile_x();
+    _async_report_profile_p();
+}
+
+void QueryContext::_async_report_profile_p() {

Review Comment:
   warning: method '_async_report_profile_p' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   void QueryContext::_async_report_profile_p() const {
   ```
   
   be/src/runtime/query_context.h:339:
   ```diff
   -     void _async_report_profile_p();
   +     void _async_report_profile_p() const;
   ```
   



##########
be/src/runtime/query_context.cpp:
##########
@@ -281,4 +295,145 @@
     return Status::OK();
 }
 
+void QueryContext::add_pipeline_profile_x(int f_id, bool finished,
+                                          profile::TRuntimeProfilePtr profile) 
{
+    std::lock_guard<std::mutex> l(_profile_mutex);
+    LOG_INFO("Query X {} add pipeline profile, fid {}", 
print_id(this->_query_id), f_id);
+    _profile_map_x[f_id].first = finished;
+    _profile_map_x[f_id].second.push_back(profile);
+}
+
+void QueryContext::add_fragment_profile_x(
+        int f_id, bool finished, const 
std::vector<profile::TRuntimeProfilePtr>& pipeline_profile) {
+    LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline 
profile count {} ",
+             print_id(this->_query_id), f_id, pipeline_profile.size());
+
+    std::lock_guard<std::mutex> l(_profile_mutex);
+    _profile_map_x[f_id] = std::make_pair(finished, pipeline_profile);
+}
+
+void QueryContext::add_instance_profile(const TUniqueId& iid, bool finished,
+                                        profile::TRuntimeProfilePtr profile) {
+    // LOG_INFO("Query {} add instance profile, iid {}, finished {}", 
print_id(this->_query_id),
+    //          print_id(iid), finished);
+    DCHECK(profile != nullptr) << print_id(iid);
+    std::lock_guard<std::mutex> lg(_profile_mutex);
+    _profile_map[print_id(iid)] =
+            std::make_pair(finished, 
std::make_shared<profile::InstanceProfile>(iid, profile));
+}
+
+void QueryContext::async_report_profile() {
+    _async_report_profile_x();
+    _async_report_profile_p();
+}
+
+void QueryContext::_async_report_profile_p() {
+    if (enable_pipeline_x_exec()) {
+        return;
+    }
+
+    std::lock_guard<std::mutex> lg(_profile_mutex);
+    LOG_INFO("Query {}, register query profile, instance profile count {}", 
print_id(_query_id),
+             _profile_map.size());
+
+    for (auto& [i_id, i_profile] : _profile_map) {
+        
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_instance_profile(
+                _query_id, i_id, this->coord_addr, i_profile.second);
+    }
+
+    _profile_map.clear();
+}
+
+void QueryContext::_async_report_profile_x() {

Review Comment:
   warning: method '_async_report_profile_x' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   void QueryContext::_async_report_profile_x() const {
   ```
   
   be/src/runtime/query_context.h:340:
   ```diff
   -     void _async_report_profile_x();
   +     void _async_report_profile_x() const;
   ```
   



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1546,4 +1516,31 @@ void 
FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_i
     }
 }
 
+profile::TRuntimeProfilePtr FragmentMgr::get_instance_profile(const TUniqueId& 
instance_id) {

Review Comment:
   warning: method 'get_instance_profile' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static profile::TRuntimeProfilePtr FragmentMgr::get_instance_profile(const 
TUniqueId& instance_id) {
   ```
   



##########
be/src/pipeline/pipeline_fragment_context.cpp:
##########
@@ -981,4 +983,49 @@ std::string PipelineFragmentContext::debug_string() {
     return fmt::to_string(debug_string_buffer);
 }
 
+std::vector<profile::TRuntimeProfilePtr> 
PipelineFragmentContext::collect_profile_x() const {
+    std::vector<profile::TRuntimeProfilePtr> res;
+    DCHECK(_query_ctx->enable_pipeline_x_exec() == true)
+            << fmt::format("Query {} calling a pipeline X function, but its 
pipeline X is disabled",
+                           print_id(this->_query_id));
+    std::stringstream ss;
+
+    for (auto& pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
+        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
+        pipeline_profile->to_thrift(&(*profile_ptr));
+        res.push_back(profile_ptr);
+
+        std::vector<RuntimeProfile*> task_x_profile;
+        pipeline_profile->get_all_children(&task_x_profile);
+        for (RuntimeProfile* p : task_x_profile) {
+            if (p->name().find("PipelineXTask") != std::string::npos) {
+                ss << p->name() << '\n';
+            } else {
+                ss << '\t' << p->name() << '\n';
+            }
+        }
+    }
+
+    LOG_INFO("Query X {} fragment {} profile\n{} ", print_id(this->_query_id), 
this->_fragment_id,
+             ss.str());
+    return res;
+}
+
+profile::TRuntimeProfilePtr PipelineFragmentContext::collect_profile_p() const 
{
+    profile::TRuntimeProfilePtr res = std::make_shared<TRuntimeProfileTree>();

Review Comment:
   warning: method 'collect_profile_p' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/pipeline_fragment_context.h:150:
   ```diff
   -     profile::TRuntimeProfilePtr collect_profile_p() const;
   +     static profile::TRuntimeProfilePtr collect_profile_p() ;
   ```
   
   ```suggestion
   
   t {
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to