github-actions[bot] commented on code in PR #33015:
URL: https://github.com/apache/doris/pull/33015#discussion_r1544120481


##########
be/src/pipeline/pipeline_fragment_context.cpp:
##########
@@ -981,4 +992,50 @@ std::string PipelineFragmentContext::debug_string() {
     return fmt::to_string(debug_string_buffer);
 }
 
+std::vector<profile::TRuntimeProfilePtr> 
PipelineFragmentContext::collect_profile_x() const {
+    std::vector<profile::TRuntimeProfilePtr> res;
+    DCHECK(_query_ctx->enable_pipeline_x_exec() == true)
+            << fmt::format("Query {} calling a pipeline X function, but its 
pipeline X is disabled",
+                           print_id(this->_query_id));
+
+    std::stringstream ss;
+
+    for (auto& pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
+        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
+        pipeline_profile->to_thrift(&(*profile_ptr));
+        res.push_back(profile_ptr);
+
+        std::vector<RuntimeProfile*> task_x_profile;
+        pipeline_profile->get_all_children(&task_x_profile);
+        for (RuntimeProfile* p : task_x_profile) {
+            if (p->name().find("PipelineXTask") != std::string::npos) {
+                ss << p->name() << '\n';
+            } else {
+                ss << '\t' << p->name() << '\n';
+            }
+        }
+    }
+
+    LOG_INFO("Query X {} fragment {} profile\n{} ", print_id(this->_query_id), 
this->_fragment_id,
+             ss.str());
+    return res;
+}
+
+profile::TRuntimeProfilePtr PipelineFragmentContext::collect_profile() const {
+    profile::TRuntimeProfilePtr res = std::make_shared<TRuntimeProfileTree>();

Review Comment:
   warning: method 'collect_profile' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/pipeline_fragment_context.h:150:
   ```diff
   -     profile::TRuntimeProfilePtr collect_profile() const;
   +     static profile::TRuntimeProfilePtr collect_profile() ;
   ```
   
   ```suggestion
   
   t {
   ```
   



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -193,7 +193,7 @@ Status FragmentMgr::trigger_pipeline_context_report(
 // it is only invoked from the executor's reporting thread.
 // Also, the reported status will always reflect the most recent execution 
status,
 // including the final status when execution finishes.
-void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
+void FragmentMgr::report_status_callback(const ReportStatusRequest& req) {

Review Comment:
   warning: function 'report_status_callback' exceeds recommended 
size/complexity thresholds [readability-function-size]
   ```cpp
   void FragmentMgr::report_status_callback(const ReportStatusRequest& req) {
                     ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/runtime/fragment_mgr.cpp:195:** 264 lines including whitespace and 
comments (threshold 80)
   ```cpp
   void FragmentMgr::report_status_callback(const ReportStatusRequest& req) {
                     ^
   ```
   
   </details>
   



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1546,4 +1577,17 @@
     }
 }
 
+void FragmentMgr::async_get_query_statics(const TUniqueId& query_id) {
+    std::lock_guard<std::mutex> lock(_lock);
+
+    auto iter = _query_ctx_map.find(query_id);
+
+    if (iter == _query_ctx_map.end()) {
+        throw doris::Exception(ErrorCode::NOT_FOUND, "query id not found");
+    }
+
+    iter->second->async_report_profile();
+    return;
+}

Review Comment:
   warning: redundant return statement at the end of a function with a void 
return type [readability-redundant-control-flow]
   
   ```suggestion
   }
   ```
   



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1546,4 +1577,17 @@
     }
 }
 
+void FragmentMgr::async_get_query_statics(const TUniqueId& query_id) {

Review Comment:
   warning: method 'async_get_query_statics' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void FragmentMgr::async_get_query_statics(const TUniqueId& query_id) {
   ```
   



##########
be/src/pipeline/pipeline_fragment_context.h:
##########
@@ -17,21 +17,25 @@
 
 #pragma once
 
+#include <gen_cpp/RuntimeProfile_types.h>

Review Comment:
   warning: 'gen_cpp/RuntimeProfile_types.h' file not found 
[clang-diagnostic-error]
   ```cpp
   #include <gen_cpp/RuntimeProfile_types.h>
            ^
   ```
   



##########
be/src/runtime/profile/profile.h:
##########
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/RuntimeProfile_types.h>

Review Comment:
   warning: 'gen_cpp/RuntimeProfile_types.h' file not found 
[clang-diagnostic-error]
   ```cpp
   #include <gen_cpp/RuntimeProfile_types.h>
            ^
   ```
   



##########
be/src/runtime/query_context.h:
##########
@@ -18,15 +18,18 @@
 #pragma once
 
 #include <gen_cpp/PaloInternalService_types.h>

Review Comment:
   warning: 'gen_cpp/PaloInternalService_types.h' file not found 
[clang-diagnostic-error]
   ```cpp
   #include <gen_cpp/PaloInternalService_types.h>
            ^
   ```
   



##########
be/src/runtime/query_context.cpp:
##########
@@ -281,4 +293,77 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& 
tg) {
     return Status::OK();
 }
 
+void QueryContext::async_report_profile_x() {

Review Comment:
   warning: method 'async_report_profile_x' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   void QueryContext::async_report_profile_x() const {
   ```
   
   be/src/runtime/query_context.h:337:
   ```diff
   -     void async_report_profile_x();
   +     void async_report_profile_x() const;
   ```
   



##########
be/src/service/backend_service.cpp:
##########
@@ -1125,4 +1128,39 @@ void 
BaseBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
     response.__set_status(Status::NotSupported("warm_up_tablets is not 
implemented").to_thrift());
 }
 
+void 
BaseBackendService::async_get_query_statics(TAsyncGetQueryStaticsResponse& 
_return,

Review Comment:
   warning: method 'async_get_query_statics' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void 
BaseBackendService::async_get_query_statics(TAsyncGetQueryStaticsResponse& 
_return,
   ```
   



##########
be/src/runtime/runtime_query_statistics_mgr.h:
##########
@@ -18,10 +18,15 @@
 #pragma once
 
 #include <gen_cpp/Data_types.h>

Review Comment:
   warning: 'gen_cpp/Data_types.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <gen_cpp/Data_types.h>
            ^
   ```
   



##########
be/src/service/backend_service.cpp:
##########
@@ -1125,4 +1128,39 @@
     response.__set_status(Status::NotSupported("warm_up_tablets is not 
implemented").to_thrift());
 }
 
+void 
BaseBackendService::async_get_query_statics(TAsyncGetQueryStaticsResponse& 
_return,
+                                                 const 
TAsyncGetQueryStaticsRequest& request) {
+    _return = TAsyncGetQueryStaticsResponse();
+
+    if (!request.__isset.query_id) {
+        LOG_WARNING("Query_id is empty");
+        _return.__set_status(Status::InvalidArgument("query_id is 
empty").to_thrift());
+        return;
+    }
+
+    auto fragment_mgr = ExecEnv::GetInstance()->fragment_mgr();
+    if (fragment_mgr == nullptr) {
+        LOG_ERROR("Fragment manager is has not been created");
+        _return.__set_status(
+                Status::InternalError("Fragment manager has not been 
created").to_thrift());
+        return;
+    }
+
+    try {
+        fragment_mgr->async_get_query_statics(request.query_id);
+
+        
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->force_report_profile();
+
+        LOG_INFO("Async get {} query statics finished", 
print_id(request.query_id));
+    } catch (const doris::Exception& e) {
+        LOG_WARNING("Failed to async get {} query statics. error: {}", 
e.what());
+        _return.__set_status(
+                Status::NotFound("Query {} not found", 
print_id(request.query_id)).to_thrift());
+        return;
+    }
+
+    _return.__set_status(Status::OK().to_thrift());
+    return;
+}

Review Comment:
   warning: redundant return statement at the end of a function with a void 
return type [readability-redundant-control-flow]
   
   ```suggestion
   }
   ```
   



##########
be/src/runtime/query_context.cpp:
##########
@@ -281,4 +293,77 @@
     return Status::OK();
 }
 
+void QueryContext::async_report_profile_x() {
+    if (!enable_pipeline_x_exec()) {
+        return;
+    }
+
+    std::lock_guard<std::mutex> lg(_profile_mutex);
+    LOG_INFO(
+            "Pipeline x query context, register query profile, query {}, 
fragment profile count {}",
+            print_id(_query_id), _profile_map_x.size());
+
+    for (auto& [fid, f_profile] : _profile_map_x) {
+        auto tmp_f_profile = std::make_shared<profile::FragmentProfileX>();
+
+        for (auto p_profile : f_profile.second) {
+            tmp_f_profile->pipeline_profiles.push_back(
+                    std::make_shared<profile::PipelineProfileX>(fid, 
f_profile.first, p_profile));
+        }
+
+        
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile_x(
+                _query_id, fid, this->coord_addr, tmp_f_profile);
+    }
+
+    _profile_map_x.clear();
+}
+
+void QueryContext::add_pipeline_profile_x(int f_id, bool finished,
+                                          profile::TRuntimeProfilePtr profile) 
{
+    std::lock_guard<std::mutex> l(_profile_mutex);
+    LOG_INFO("Query X {} add pipeline profile, fid {}", 
print_id(this->_query_id), f_id);
+    _profile_map_x[f_id].first = finished;
+    _profile_map_x[f_id].second.push_back(profile);
+}
+
+void QueryContext::add_fragment_profile_x(
+        int f_id, bool finished, const 
std::vector<profile::TRuntimeProfilePtr>& pipeline_profile) {
+    LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline 
profile count {} ",
+             print_id(this->_query_id), f_id, pipeline_profile.size());
+
+    std::lock_guard<std::mutex> l(_profile_mutex);
+    _profile_map_x[f_id] = std::make_pair(finished, pipeline_profile);
+}
+
+void QueryContext::add_instance_profile(const TUniqueId& iid, bool finished,
+                                        profile::TRuntimeProfilePtr profile) {
+    // LOG_INFO("Query {} add instance profile, iid {}, finished {}", 
print_id(this->_query_id),
+    //          print_id(iid), finished);
+    DCHECK(profile != nullptr) << print_id(iid);
+    std::lock_guard<std::mutex> lg(_profile_mutex);
+    _profile_map[print_id(iid)] =
+            std::make_pair(finished, 
std::make_shared<profile::InstanceProfile>(iid, profile));
+}
+
+void QueryContext::async_report_profile() {

Review Comment:
   warning: method 'async_report_profile' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   void QueryContext::async_report_profile() const {
   ```
   
   be/src/runtime/query_context.h:344:
   ```diff
   -     void async_report_profile();
   +     void async_report_profile() const;
   ```
   



##########
be/src/service/backend_service.cpp:
##########
@@ -1125,4 +1128,39 @@
     response.__set_status(Status::NotSupported("warm_up_tablets is not 
implemented").to_thrift());
 }
 
+void 
BaseBackendService::async_get_query_statics(TAsyncGetQueryStaticsResponse& 
_return,
+                                                 const 
TAsyncGetQueryStaticsRequest& request) {
+    _return = TAsyncGetQueryStaticsResponse();
+
+    if (!request.__isset.query_id) {
+        LOG_WARNING("Query_id is empty");
+        _return.__set_status(Status::InvalidArgument("query_id is 
empty").to_thrift());
+        return;
+    }
+
+    auto fragment_mgr = ExecEnv::GetInstance()->fragment_mgr();

Review Comment:
   warning: 'auto fragment_mgr' can be declared as 'auto *fragment_mgr' 
[readability-qualified-auto]
   
   ```suggestion
       auto *fragment_mgr = ExecEnv::GetInstance()->fragment_mgr();
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to