0AyanamiRei commented on code in PR #33945:
URL: https://github.com/apache/doris/pull/33945#discussion_r2591798085


##########
be/src/pipeline/pipeline_fragment_context.cpp:
##########
@@ -976,31 +1563,100 @@ Status PipelineFragmentContext::send_report(bool done) {
         return Status::NeedSendAgain("");
     }
 
+    std::vector<RuntimeState*> runtime_states;
+
+    for (auto& task_state : _task_runtime_states) {
+        runtime_states.push_back(task_state.get());
+    }
+
+    ReportStatusRequest req {true,
+                             exec_status,
+                             runtime_states,
+                             nullptr,
+                             _runtime_state->load_channel_profile(),
+                             done || !exec_status.ok(),
+                             _query_ctx->coord_addr,
+                             _query_id,
+                             _fragment_id,
+                             TUniqueId(),
+                             -1,
+                             _runtime_state.get(),
+                             [this](Status st) { return update_status(st); },
+                             [this](const PPlanFragmentCancelReason& reason,
+                                    const std::string& msg) { cancel(reason, 
msg); }};
+
     return _report_status_cb(
-            {false,
-             exec_status,
-             {},
-             _runtime_state->enable_profile() ? 
_runtime_state->runtime_profile() : nullptr,
-             _runtime_state->enable_profile() ? 
_runtime_state->load_channel_profile() : nullptr,
-             done || !exec_status.ok(),
-             _query_ctx->coord_addr,
-             _query_id,
-             _fragment_id,
-             _fragment_instance_id,
-             _backend_num,
-             _runtime_state.get(),
-             [this](Status st) { return update_status(st); },
-             [this](const PPlanFragmentCancelReason& reason, const 
std::string& msg) {
-                 cancel(reason, msg);
-             }},
-            
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
+            req, 
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
 }
 
 std::string PipelineFragmentContext::debug_string() {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info: QueryId 
= {}\n",
-                   print_id(_query_ctx->query_id()));
+    fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info:\n");
+    for (size_t j = 0; j < _tasks.size(); j++) {
+        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
+        for (size_t i = 0; i < _tasks[j].size(); i++) {
+            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i, 
_tasks[j][i]->debug_string());
+        }
+    }
+
     return fmt::to_string(debug_string_buffer);
 }
 
+std::vector<std::shared_ptr<TRuntimeProfileTree>>
+PipelineFragmentContext::collect_realtime_profile_x() const {
+    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
+    DCHECK(_query_ctx->enable_pipeline_x_exec() == true)
+            << fmt::format("Query {} calling a pipeline X function, but its 
pipeline X is disabled",
+                           print_id(this->_query_id));
+
+    // we do not have mutex to protect pipeline_id_to_profile
+    // so we need to make sure this funciton is invoked after fragment context
+    // has already been prepared.
+    if (!this->_prepared) {
+        std::string msg =
+                "Query " + print_id(this->_query_id) + " collecting profile, 
but its not prepared";
+        DCHECK(false) << msg;
+        LOG_ERROR(msg);
+        return res;
+    }
+
+    // pipeline_id_to_profile is initialized in prepare stage
+    for (auto& pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
+        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
+        pipeline_profile->to_thrift(profile_ptr.get());
+        res.push_back(profile_ptr);
+    }
+
+    return res;
+}
+
+std::shared_ptr<TRuntimeProfileTree>
+PipelineFragmentContext::collect_realtime_load_channel_profile_x() const {
+    // we do not have mutex to protect pipeline_id_to_profile
+    // so we need to make sure this funciton is invoked after fragment context
+    // has already been prepared.
+    if (!this->_prepared) {
+        std::string msg =
+                "Query " + print_id(this->_query_id) + " collecting profile, 
but its not prepared";
+        DCHECK(false) << msg;
+        LOG_ERROR(msg);
+        return nullptr;
+    }
+
+    for (auto& runtime_state : _task_runtime_states) {
+        if (runtime_state->runtime_profile() == nullptr) {
+            continue;
+        }
+
+        auto tmp_load_channel_profile = 
std::make_shared<TRuntimeProfileTree>();
+
+        
runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get());

Review Comment:
   In `VNodeChannel::_add_block_success_callback` we use 
`_state->load_channel_profile()->update(tprofile);`, so here is a empty 
profile, we can not see any things in profile file about "LoadChannel"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to