github-actions[bot] commented on code in PR #33015:
URL: https://github.com/apache/doris/pull/33015#discussion_r1547111706


##########
be/src/runtime/query_context.cpp:
##########
@@ -293,4 +310,113 @@
     return Status::OK();
 }
 
+void QueryContext::add_pipeline_profile_x(int f_id, 
std::shared_ptr<TRuntimeProfileTree> profile) {
+    std::lock_guard<std::mutex> l(_profile_mutex);
+    LOG_INFO("Query X {} add pipeline profile, fid {}", 
print_id(this->_query_id), f_id);
+    DCHECK(_profile_map_x != nullptr && profile != nullptr) << fmt::format(
+            "Add pipeline profile failed, query {}, fragment {}", 
print_id(this->_query_id), f_id);
+
+    if (!_profile_map_x->contains(f_id)) {
+        LOG_WARNING("Query X {} add pipeline profile, fid {} not found", 
print_id(this->_query_id),
+                    f_id);
+        return;
+    }
+
+    (*_profile_map_x)[f_id].push_back(profile);
+}
+
+void QueryContext::add_fragment_profile_x(
+        int f_id, const std::vector<std::shared_ptr<TRuntimeProfileTree>>& 
pipeline_profiles) {
+    LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline 
profile count {} ",
+             print_id(this->_query_id), f_id, pipeline_profiles.size());
+
+    std::lock_guard<std::mutex> l(_profile_mutex);
+
+    DCHECK(_profile_map_x != nullptr) << fmt::format(
+            "Add pipeline profile failed, query {}, fragment {}", 
print_id(this->_query_id), f_id);
+#ifndef NDEBUG
+    for (const auto& p : pipeline_profiles) {
+        DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed, 
query {}, fragment {}",
+                                            print_id(this->_query_id), f_id);
+    }
+#endif
+    _profile_map_x->insert(std::make_pair(f_id, pipeline_profiles));
+}
+
+void QueryContext::add_instance_profile(const TUniqueId& i_id,
+                                        std::shared_ptr<TRuntimeProfileTree> 
profile) {
+    DCHECK(profile != nullptr) << print_id(i_id);
+    std::lock_guard<std::mutex> lg(_profile_mutex);
+    DCHECK(_profile_map != nullptr)
+            << fmt::format("Add pipeline profile failed, query {}, instance 
{}",
+                           print_id(this->_query_id), print_id(i_id));
+
+    _profile_map->insert(std::make_pair(i_id, profile));
+}
+
+void QueryContext::async_report_profile() {
+    _async_report_profile_x();
+    _async_report_profile_p();
+}
+
+void QueryContext::_async_report_profile_p() {

Review Comment:
   warning: method '_async_report_profile_p' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   void QueryContext::_async_report_profile_p() const {
   ```
   
   be/src/runtime/query_context.h:367:
   ```diff
   -     void _async_report_profile_p();
   +     void _async_report_profile_p() const;
   ```
   



##########
be/src/runtime/query_context.cpp:
##########
@@ -293,4 +310,113 @@
     return Status::OK();
 }
 
+void QueryContext::add_pipeline_profile_x(int f_id, 
std::shared_ptr<TRuntimeProfileTree> profile) {
+    std::lock_guard<std::mutex> l(_profile_mutex);
+    LOG_INFO("Query X {} add pipeline profile, fid {}", 
print_id(this->_query_id), f_id);
+    DCHECK(_profile_map_x != nullptr && profile != nullptr) << fmt::format(
+            "Add pipeline profile failed, query {}, fragment {}", 
print_id(this->_query_id), f_id);
+
+    if (!_profile_map_x->contains(f_id)) {
+        LOG_WARNING("Query X {} add pipeline profile, fid {} not found", 
print_id(this->_query_id),
+                    f_id);
+        return;
+    }
+
+    (*_profile_map_x)[f_id].push_back(profile);
+}
+
+void QueryContext::add_fragment_profile_x(
+        int f_id, const std::vector<std::shared_ptr<TRuntimeProfileTree>>& 
pipeline_profiles) {
+    LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline 
profile count {} ",
+             print_id(this->_query_id), f_id, pipeline_profiles.size());
+
+    std::lock_guard<std::mutex> l(_profile_mutex);
+
+    DCHECK(_profile_map_x != nullptr) << fmt::format(
+            "Add pipeline profile failed, query {}, fragment {}", 
print_id(this->_query_id), f_id);
+#ifndef NDEBUG
+    for (const auto& p : pipeline_profiles) {
+        DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed, 
query {}, fragment {}",
+                                            print_id(this->_query_id), f_id);
+    }
+#endif
+    _profile_map_x->insert(std::make_pair(f_id, pipeline_profiles));
+}
+
+void QueryContext::add_instance_profile(const TUniqueId& i_id,
+                                        std::shared_ptr<TRuntimeProfileTree> 
profile) {
+    DCHECK(profile != nullptr) << print_id(i_id);
+    std::lock_guard<std::mutex> lg(_profile_mutex);
+    DCHECK(_profile_map != nullptr)
+            << fmt::format("Add pipeline profile failed, query {}, instance 
{}",
+                           print_id(this->_query_id), print_id(i_id));
+
+    _profile_map->insert(std::make_pair(i_id, profile));
+}
+
+void QueryContext::async_report_profile() {
+    _async_report_profile_x();
+    _async_report_profile_p();
+}
+
+void QueryContext::_async_report_profile_p() {
+    if (enable_pipeline_x_exec()) {
+        return;
+    }
+
+    std::lock_guard<std::mutex> lg(_profile_mutex);
+    LOG_INFO("Query {}, register query profile, instance profile count {}", 
print_id(_query_id),
+             _profile_map->size());
+
+    for (auto& [i_id, i_profile] : *_profile_map) {
+        
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_instance_profile(
+                _query_id, this->coord_addr, i_id, i_profile);
+    }
+
+    _profile_map->clear();
+}
+
+void QueryContext::_async_report_profile_x() {

Review Comment:
   warning: method '_async_report_profile_x' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   void QueryContext::_async_report_profile_x() const {
   ```
   
   be/src/runtime/query_context.h:368:
   ```diff
   -     void _async_report_profile_x();
   +     void _async_report_profile_x() const;
   ```
   



##########
be/src/runtime/runtime_query_statistics_mgr.h:
##########
@@ -75,9 +82,65 @@ class RuntimeQueryStatiticsMgr {
     // used for backend_active_tasks
     void get_active_be_tasks_block(vectorized::Block* block);
 
+    void start_report_thread();
+    void report_query_profiles_thread();
+    void force_report_profile();
+    void stop_report_thread();
+
+    void submit_report_status_task(const TUniqueId& q_id, const 
TNetworkAddress& coor_addr,
+                                   int32 f_id, const TUniqueId& i_id, int 
be_num, bool done,
+                                   Status exec_status);
+
+    void register_instance_profile(const TUniqueId& query_id, const 
TNetworkAddress& coor_addr,
+                                   const TUniqueId& instance_id,
+                                   std::shared_ptr<TRuntimeProfileTree> 
i_profile);
+
+    void register_fragment_profile_x(const TUniqueId& query_id, const 
TNetworkAddress& const_addr,
+                                     int32_t fragment_id,
+                                     
std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles);
+    using CoorAddrWithFragmentProfileMap =
+            std::tuple<TNetworkAddress,
+                       std::unordered_map<int, 
std::vector<std::shared_ptr<TRuntimeProfileTree>>>>;
+
+    using CoorAddrWithInstanceProfileMap =
+            std::tuple<TNetworkAddress,
+                       std::unordered_map<TUniqueId, 
std::shared_ptr<TRuntimeProfileTree>>>;
+
+    static TReportExecStatusParams create_report_exec_status_params(
+            const TUniqueId& q_id, int32 f_id,
+            const std::vector<std::shared_ptr<TRuntimeProfileTree>>& 
f_profile);
+
+    static void report_query_profile(
+            const TUniqueId&, const TNetworkAddress&,

Review Comment:
   warning: parameter 3 is const-qualified in the function declaration; 
const-qualification of parameters only has an effect in function definitions 
[readability-avoid-const-params-in-decls]
   
   ```suggestion
               std::unordered_map<int, 
std::vector<std::shared_ptr<TRuntimeProfileTree>>>&);
   ```
   



##########
be/src/runtime/query_context.cpp:
##########
@@ -293,4 +310,113 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& 
tg) {
     return Status::OK();
 }
 
+void QueryContext::add_pipeline_profile_x(int f_id, 
std::shared_ptr<TRuntimeProfileTree> profile) {

Review Comment:
   warning: method 'add_pipeline_profile_x' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/runtime/query_context.h:373:
   ```diff
   -     void add_pipeline_profile_x(int f_id, 
std::shared_ptr<TRuntimeProfileTree> profile);
   +     static void add_pipeline_profile_x(int f_id, 
std::shared_ptr<TRuntimeProfileTree> profile);
   ```
   



##########
be/src/pipeline/pipeline_fragment_context.cpp:
##########
@@ -993,4 +995,50 @@ std::string PipelineFragmentContext::debug_string() {
     return fmt::to_string(debug_string_buffer);
 }
 
+std::vector<std::shared_ptr<TRuntimeProfileTree>> 
PipelineFragmentContext::collect_profile_x()
+        const {
+    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
+    DCHECK(_query_ctx->enable_pipeline_x_exec() == true)
+            << fmt::format("Query {} calling a pipeline X function, but its 
pipeline X is disabled",
+                           print_id(this->_query_id));
+    std::stringstream ss;
+
+    for (auto& pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
+        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
+        pipeline_profile->to_thrift(&(*profile_ptr));
+        res.push_back(profile_ptr);
+
+        std::vector<RuntimeProfile*> task_x_profile;
+        pipeline_profile->get_all_children(&task_x_profile);
+        for (RuntimeProfile* p : task_x_profile) {
+            if (p->name().find("PipelineXTask") != std::string::npos) {
+                ss << p->name() << '\n';
+            } else {
+                ss << '\t' << p->name() << '\n';
+            }
+        }
+    }
+
+    LOG_INFO("Query X {} fragment {} profile\n{} ", print_id(this->_query_id), 
this->_fragment_id,
+             ss.str());
+    return res;
+}
+
+std::shared_ptr<TRuntimeProfileTree> 
PipelineFragmentContext::collect_profile_p() const {
+    auto res = std::make_shared<TRuntimeProfileTree>();

Review Comment:
   warning: method 'collect_profile_p' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/pipeline_fragment_context.h:149:
   ```diff
   -     std::shared_ptr<TRuntimeProfileTree> collect_profile_p() const;
   +     static std::shared_ptr<TRuntimeProfileTree> collect_profile_p() ;
   ```
   
   ```suggestion
   
   t {
   ```
   



##########
be/src/service/backend_service.cpp:
##########
@@ -1129,4 +1133,51 @@
     response.__set_status(Status::NotSupported("warm_up_tablets is not 
implemented").to_thrift());
 }
 
+void BaseBackendService::get_realtime_query_exec_status(
+        TGetRealTimeQueryExecStatusResponse& _return,
+        const TGetRealTimeQueryExecStatusRequest& request) {
+    _return = TGetRealTimeQueryExecStatusResponse();
+
+    if (!request.__isset.query_id) {
+        LOG_WARNING("Query_id is empty");
+        _return.__set_status(Status::InvalidArgument("query_id is 
empty").to_thrift());
+        return;
+    }
+

Review Comment:
   warning: 'auto fragment_mgr' can be declared as 'auto *fragment_mgr' 
[readability-qualified-auto]
   
   ```suggestion
       auto *fragment_mgr = ExecEnv::GetInstance()->fragment_mgr();
   ```
   



##########
be/src/service/backend_service.cpp:
##########
@@ -1129,4 +1133,51 @@ void 
BaseBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
     response.__set_status(Status::NotSupported("warm_up_tablets is not 
implemented").to_thrift());
 }
 
+void BaseBackendService::get_realtime_query_exec_status(

Review Comment:
   warning: method 'get_realtime_query_exec_status' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void 
BaseBackendService::get_realtime_query_exec_status(TGetRealTimeQueryExecStatusResponse&
 _return,
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to