This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d5a6abb9c2 [refactor](profile) refactor profile report on BE (#33331)
7d5a6abb9c2 is described below

commit 7d5a6abb9c2439e29b89bad9b8a35ee1284adf41
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Wed Apr 17 13:54:03 2024 +0800

    [refactor](profile) refactor profile report on BE (#33331)
    
    First task of #33744
    
    During close stage, PipelineXFragmentContext will collect is profile, and 
register them to QueryContext
    De-constructor of QueryContext will add all of its profile to 
RuntimeQueryStatiticsMgr, and trigger a report thread to report profile
    Report thread of RuntimeQueryStatiticsMgr will collect all registered 
profile, and report profile by query.
    RuntimeQueryStatiticsMgr will create 5 threads to do profile report task, 
when BE stops gracefully, they will flush all profile, which is important in 
cloud mode.
---
 be/src/common/config.cpp                           |   2 +
 be/src/common/config.h                             |   1 +
 be/src/pipeline/pipeline_fragment_context.h        |   2 +-
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  90 ++++-
 .../pipeline_x/pipeline_x_fragment_context.h       |   3 +
 be/src/runtime/exec_env_init.cpp                   |   5 +-
 be/src/runtime/fragment_mgr.cpp                    |  59 +++
 be/src/runtime/fragment_mgr.h                      |   5 +
 be/src/runtime/plan_fragment_executor.cpp          |  33 ++
 be/src/runtime/plan_fragment_executor.h            |   4 +
 be/src/runtime/query_context.cpp                   | 181 ++++++++-
 be/src/runtime/query_context.h                     |  58 +++
 be/src/runtime/runtime_query_statistics_mgr.cpp    | 421 +++++++++++++++++++++
 be/src/runtime/runtime_query_statistics_mgr.h      |  70 ++++
 be/src/service/backend_service.cpp                 |  26 ++
 be/src/service/backend_service.h                   |   3 +
 .../doris/common/profile/ExecutionProfile.java     |  79 ++++
 .../java/org/apache/doris/qe/QeProcessorImpl.java  |  33 ++
 .../org/apache/doris/common/GenericPoolTest.java   |   8 +
 .../apache/doris/utframe/MockedBackendFactory.java |   8 +
 gensrc/thrift/BackendService.thrift                |  13 +
 gensrc/thrift/FrontendService.thrift               |  22 +-
 22 files changed, 1112 insertions(+), 14 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 13acfd49042..603b1739062 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1153,6 +1153,8 @@ DEFINE_mInt32(report_query_statistics_interval_ms, 
"3000");
 // 30s
 DEFINE_mInt32(query_statistics_reserve_timeout_ms, "30000");
 
+DEFINE_mInt32(report_exec_status_thread_num, "5");
+
 // consider two high usage disk at the same available level if they do not 
exceed this diff.
 DEFINE_mDouble(high_disk_avail_level_diff_usages, "0.15");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 9f226e22be2..2aca6a31fbb 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1235,6 +1235,7 @@ DECLARE_Int32(ignore_invalid_partition_id_rowset_num);
 
 DECLARE_mInt32(report_query_statistics_interval_ms);
 DECLARE_mInt32(query_statistics_reserve_timeout_ms);
+DECLARE_mInt32(report_exec_status_thread_num);
 
 // consider two high usage disk at the same available level if they do not 
exceed this diff.
 DECLARE_mDouble(high_disk_avail_level_diff_usages);
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 009a2a2f22d..b9bfcb28f68 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -156,7 +156,7 @@ protected:
 
     ExecEnv* _exec_env = nullptr;
 
-    bool _prepared = false;
+    std::atomic_bool _prepared = false;
     bool _submitted = false;
 
     std::mutex _status_lock;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index f23e39472ab..3b03fbbf400 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -21,6 +21,7 @@
 #include <gen_cpp/PaloInternalService_types.h>
 #include <gen_cpp/PlanNodes_types.h>
 #include <gen_cpp/Planner_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
 #include <pthread.h>
 #include <runtime/result_buffer_mgr.h>
 
@@ -1378,6 +1379,12 @@ void 
PipelineXFragmentContext::_close_fragment_instance() {
         LOG_INFO("Query {} fragment {} profile:\n {}", 
print_id(this->_query_id),
                  this->_fragment_id, ss.str());
     }
+
+    if (_query_ctx->enable_profile()) {
+        _query_ctx->add_fragment_profile_x(_fragment_id, 
collect_realtime_profile_x(),
+                                           
collect_realtime_load_channel_profile_x());
+    }
+
     // all submitted tasks done
     _exec_env->fragment_mgr()->remove_pipeline_context(
             
std::dynamic_pointer_cast<PipelineXFragmentContext>(shared_from_this()));
@@ -1409,15 +1416,82 @@ Status PipelineXFragmentContext::send_report(bool done) 
{
     for (auto& task_state : _task_runtime_states) {
         runtime_states.push_back(task_state.get());
     }
+
+    ReportStatusRequest req {true,
+                             exec_status,
+                             runtime_states,
+                             nullptr,
+                             _runtime_state->load_channel_profile(),
+                             done || !exec_status.ok(),
+                             _query_ctx->coord_addr,
+                             _query_id,
+                             _fragment_id,
+                             TUniqueId(),
+                             _backend_num,
+                             _runtime_state.get(),
+                             [this](Status st) { return update_status(st); },
+                             [this](const PPlanFragmentCancelReason& reason,
+                                    const std::string& msg) { cancel(reason, 
msg); }};
+
     return _report_status_cb(
-            {true, exec_status, runtime_states, nullptr, 
_runtime_state->load_channel_profile(),
-             done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, 
_fragment_id,
-             TUniqueId(), _backend_num, _runtime_state.get(),
-             [this](Status st) { return update_status(st); },
-             [this](const PPlanFragmentCancelReason& reason, const 
std::string& msg) {
-                 cancel(reason, msg);
-             }},
-            
std::dynamic_pointer_cast<PipelineXFragmentContext>(shared_from_this()));
+            req, 
std::dynamic_pointer_cast<PipelineXFragmentContext>(shared_from_this()));
+}
+
+std::vector<std::shared_ptr<TRuntimeProfileTree>>
+PipelineXFragmentContext::collect_realtime_profile_x() const {
+    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
+    DCHECK(_query_ctx->enable_pipeline_x_exec() == true)
+            << fmt::format("Query {} calling a pipeline X function, but its 
pipeline X is disabled",
+                           print_id(this->_query_id));
+
+    // we do not have mutex to protect pipeline_id_to_profile
+    // so we need to make sure this funciton is invoked after fragment context
+    // has already been prepared.
+    if (!this->_prepared) {
+        std::string msg =
+                "Query " + print_id(this->_query_id) + " collecting profile, 
but its not prepared";
+        DCHECK(false) << msg;
+        LOG_ERROR(msg);
+        return res;
+    }
+
+    // pipeline_id_to_profile is initialized in prepare stage
+    for (auto& pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
+        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
+        pipeline_profile->to_thrift(profile_ptr.get());
+        res.push_back(profile_ptr);
+    }
+
+    return res;
+}
+
+std::shared_ptr<TRuntimeProfileTree>
+PipelineXFragmentContext::collect_realtime_load_channel_profile_x() const {
+    // we do not have mutex to protect pipeline_id_to_profile
+    // so we need to make sure this funciton is invoked after fragment context
+    // has already been prepared.
+    if (!this->_prepared) {
+        std::string msg =
+                "Query " + print_id(this->_query_id) + " collecting profile, 
but its not prepared";
+        DCHECK(false) << msg;
+        LOG_ERROR(msg);
+        return nullptr;
+    }
+
+    for (auto& runtime_state : _task_runtime_states) {
+        if (runtime_state->runtime_profile() == nullptr) {
+            continue;
+        }
+
+        auto tmp_load_channel_profile = 
std::make_shared<TRuntimeProfileTree>();
+
+        
runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get());
+        
this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
+    }
+
+    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
+    
this->_runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get());
+    return load_channel_profile;
 }
 
 std::string PipelineXFragmentContext::debug_string() {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 34d00c07652..31febc0d8aa 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -112,6 +112,9 @@ public:
 
     [[nodiscard]] int max_sink_operator_id() const { return _sink_operator_id; 
}
 
+    std::vector<std::shared_ptr<TRuntimeProfileTree>> 
collect_realtime_profile_x() const;
+    std::shared_ptr<TRuntimeProfileTree> 
collect_realtime_load_channel_profile_x() const;
+
     std::string debug_string() override;
 
 private:
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 76c877c3695..37cd51d1ca4 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -308,7 +308,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     _workload_sched_mgr->start(this);
 
     RETURN_IF_ERROR(_spill_stream_mgr->init());
-
+    _runtime_query_statistics_mgr->start_report_thread();
     _s_ready = true;
 
     return Status::OK();
@@ -606,6 +606,9 @@ void ExecEnv::destroy() {
     _storage_engine.reset();
 
     SAFE_STOP(_spill_stream_mgr);
+    if (_runtime_query_statistics_mgr) {
+        _runtime_query_statistics_mgr->stop_report_thread();
+    }
     SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool);
     SAFE_SHUTDOWN(_s3_file_upload_thread_pool);
     SAFE_SHUTDOWN(_join_node_thread_pool);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 14f77a98ace..bc80dca583d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -29,6 +29,7 @@
 #include <gen_cpp/PlanNodes_types.h>
 #include <gen_cpp/Planner_types.h>
 #include <gen_cpp/QueryPlanExtra_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/internal_service.pb.h>
 #include <pthread.h>
@@ -36,6 +37,7 @@
 #include <thrift/Thrift.h>
 #include <thrift/protocol/TDebugProtocol.h>
 #include <thrift/transport/TTransportException.h>
+#include <unistd.h>
 
 #include <atomic>
 
@@ -47,6 +49,7 @@
 #include <memory>
 #include <mutex>
 #include <sstream>
+#include <unordered_map>
 #include <utility>
 
 #include "common/config.h"
@@ -483,6 +486,7 @@ void 
FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
                             
std::chrono::system_clock::now().time_since_epoch())
                             .count();
         std::lock_guard<std::mutex> lock(_lock);
+
         
_fragment_instance_map.erase(fragment_executor->fragment_instance_id());
 
         g_fragment_executing_count << -1;
@@ -1612,4 +1616,59 @@ void 
FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_i
     }
 }
 
+Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id,
+                                             TReportExecStatusParams* 
exec_status) {
+    if (exec_status == nullptr) {
+        return Status::InvalidArgument("exes_status is nullptr");
+    }
+
+    std::shared_ptr<QueryContext> query_context = nullptr;
+
+    {
+        std::lock_guard<std::mutex> lock(_lock);
+        query_context = _query_ctx_map[query_id];
+    }
+
+    if (query_context == nullptr) {
+        return Status::NotFound("Query {} not found", print_id(query_id));
+    }
+
+    if (query_context->enable_pipeline_x_exec()) {
+        *exec_status = query_context->get_realtime_exec_status_x();
+    } else {
+        auto instance_ids = query_context->get_fragment_instance_ids();
+        std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>> 
instance_profiles;
+        std::vector<std::shared_ptr<TRuntimeProfileTree>> 
load_channel_profiles;
+
+        for (auto& instance_id : instance_ids) {
+            std::shared_ptr<PlanFragmentExecutor> instance_executor = nullptr;
+
+            {
+                std::lock_guard<std::mutex> lock(_lock);
+                instance_executor = _fragment_instance_map[instance_id];
+            }
+
+            if (instance_executor == nullptr) {
+                return Status::NotFound("Fragment instance {} not found", 
print_id(instance_id));
+            }
+
+            if (auto instance_profile = 
instance_executor->collect_realtime_query_profile()) {
+                instance_profiles.insert(std::make_pair(instance_id, 
instance_profile));
+            } else {
+                continue;
+            }
+
+            if (auto load_channel_profile =
+                        
instance_executor->collect_realtime_load_channel_profile()) {
+                load_channel_profiles.push_back(load_channel_profile);
+            }
+        }
+
+        *exec_status = 
RuntimeQueryStatiticsMgr::create_report_exec_status_params_non_pipeline(
+                query_id, instance_profiles, load_channel_profiles);
+    }
+
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index e748c88cece..25b555f4fe8 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <gen_cpp/FrontendService_types.h>
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/types.pb.h>
 #include <stdint.h>
@@ -33,6 +34,7 @@
 #include "common/status.h"
 #include "gutil/ref_counted.h"
 #include "http/rest_monitor_iface.h"
+#include "runtime/plan_fragment_executor.h"
 #include "runtime/query_context.h"
 #include "runtime_filter_mgr.h"
 #include "util/countdown_latch.h"
@@ -151,6 +153,9 @@ public:
 
     void get_runtime_query_info(std::vector<WorkloadQueryInfo>* 
_query_info_list);
 
+    Status get_realtime_exec_status(const TUniqueId& query_id,
+                                    TReportExecStatusParams* exec_status);
+
 private:
     void cancel_unlocked_impl(const TUniqueId& id, const 
PPlanFragmentCancelReason& reason,
                               const std::unique_lock<std::mutex>& state_lock, 
bool is_pipeline,
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 7bf4f3846dd..5c0668ef55c 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -25,11 +25,13 @@
 #include <gen_cpp/Metrics_types.h>
 #include <gen_cpp/PlanNodes_types.h>
 #include <gen_cpp/Planner_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
 #include <pthread.h>
 #include <stdint.h>
 #include <stdlib.h>
 // IWYU pragma: no_include <bits/chrono.h>
 #include <chrono> // IWYU pragma: keep
+#include <memory>
 #include <ostream>
 #include <typeinfo>
 #include <utility>
@@ -638,10 +640,41 @@ void PlanFragmentExecutor::close() {
                 load_channel_profile()->pretty_print(&ss);
             }
             LOG(INFO) << ss.str();
+
+            _query_ctx->add_instance_profile(_fragment_instance_id,
+                                             collect_realtime_query_profile(),
+                                             
collect_realtime_load_channel_profile());
         }
     }
 
     _closed = true;
 }
 
+std::shared_ptr<TRuntimeProfileTree> 
PlanFragmentExecutor::collect_realtime_query_profile() const {
+    std::shared_ptr<TRuntimeProfileTree> res = 
std::make_shared<TRuntimeProfileTree>();
+
+    if (_runtime_state != nullptr) {
+        _runtime_state->runtime_profile()->compute_time_in_profile();
+        _runtime_state->runtime_profile()->to_thrift(res.get());
+    } else {
+        return nullptr;
+    }
+
+    return res;
+}
+
+std::shared_ptr<TRuntimeProfileTree> 
PlanFragmentExecutor::collect_realtime_load_channel_profile()
+        const {
+    std::shared_ptr<TRuntimeProfileTree> res = 
std::make_shared<TRuntimeProfileTree>();
+
+    if (_runtime_state != nullptr) {
+        _runtime_state->load_channel_profile()->compute_time_in_profile();
+        _runtime_state->load_channel_profile()->to_thrift(res.get());
+    } else {
+        return nullptr;
+    }
+
+    return res;
+}
+
 } // namespace doris
diff --git a/be/src/runtime/plan_fragment_executor.h 
b/be/src/runtime/plan_fragment_executor.h
index 89b2534b61b..e4d29af9ae9 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -21,6 +21,7 @@
 #pragma once
 
 #include <gen_cpp/PaloInternalService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/types.pb.h>
 
@@ -146,6 +147,9 @@ public:
 
     Status update_status(Status status);
 
+    std::shared_ptr<TRuntimeProfileTree> collect_realtime_query_profile() 
const;
+    std::shared_ptr<TRuntimeProfileTree> 
collect_realtime_load_channel_profile() const;
+
 private:
     ExecEnv* _exec_env = nullptr; // not owned
     ExecNode* _plan = nullptr;    // lives in _runtime_state->obj_pool()
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 10f54255741..0a76a13cac9 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -17,15 +17,31 @@
 
 #include "runtime/query_context.h"
 
+#include <fmt/core.h>
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Types_types.h>
+#include <glog/logging.h>
+
 #include <exception>
 #include <memory>
+#include <mutex>
+#include <sstream>
+#include <utility>
 
+#include "common/logging.h"
+#include "olap/olap_common.h"
 #include "pipeline/pipeline_fragment_context.h"
 #include "pipeline/pipeline_x/dependency.h"
+#include "pipeline/pipeline_x/pipeline_x_fragment_context.h"
+#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
 #include "runtime/runtime_query_statistics_mgr.h"
+#include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
 #include "runtime/workload_group/workload_group_manager.h"
 #include "util/mem_info.h"
+#include "util/uid_util.h"
 
 namespace doris {
 
@@ -117,7 +133,11 @@ QueryContext::~QueryContext() {
     }
 
     
_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
-    LOG_INFO("Query {} deconstructed, {}", print_id(_query_id), 
mem_tracker_msg);
+
+    if (enable_profile()) {
+        _report_query_profile();
+    }
+
     // Not release the the thread token in query context's dector method, 
because the query
     // conext may be dectored in the thread token it self. It is very 
dangerous and may core.
     // And also thread token need shutdown, it may take some time, may cause 
the thread that
@@ -146,6 +166,8 @@ QueryContext::~QueryContext() {
     _runtime_predicates.clear();
     file_scan_range_params_map.clear();
     obj_pool.clear();
+
+    LOG_INFO("Query {} deconstructed, {}", print_id(this->_query_id), 
mem_tracker_msg);
 }
 
 void QueryContext::set_ready_to_execute(bool is_cancelled) {
@@ -303,4 +325,161 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& 
tg) {
     return Status::OK();
 }
 
+void QueryContext::add_fragment_profile_x(
+        int fragment_id, const 
std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles,
+        std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
+    if (pipeline_profiles.empty()) {
+        std::string msg = fmt::format("Add pipeline profile failed, query {}, 
fragment {}",
+                                      print_id(this->_query_id), fragment_id);
+        LOG_ERROR(msg);
+        DCHECK(false) << msg;
+        return;
+    }
+
+#ifndef NDEBUG
+    for (const auto& p : pipeline_profiles) {
+        DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed, 
query {}, fragment {}",
+                                            print_id(this->_query_id), 
fragment_id);
+    }
+#endif
+
+    std::lock_guard<std::mutex> l(_profile_mutex);
+    LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline 
profile count {} ",
+             print_id(this->_query_id), fragment_id, pipeline_profiles.size());
+
+    _profile_map_x.insert(std::make_pair(fragment_id, pipeline_profiles));
+
+    if (load_channel_profile != nullptr) {
+        _load_channel_profile_map_x.insert(std::make_pair(fragment_id, 
load_channel_profile));
+    }
+}
+
+void QueryContext::add_instance_profile(const TUniqueId& instance_id,
+                                        std::shared_ptr<TRuntimeProfileTree> 
profile,
+                                        std::shared_ptr<TRuntimeProfileTree> 
load_channel_profile) {
+    DCHECK(profile != nullptr) << print_id(instance_id);
+
+    std::lock_guard<std::mutex> lg(_profile_mutex);
+    _profile_map.insert(std::make_pair(instance_id, profile));
+    if (load_channel_profile != nullptr) {
+        _load_channel_profile_map.insert(std::make_pair(instance_id, 
load_channel_profile));
+    }
+}
+
+void QueryContext::_report_query_profile() {
+    _report_query_profile_x();
+    _report_query_profile_non_pipeline();
+}
+
+void QueryContext::_report_query_profile_non_pipeline() {
+    if (enable_pipeline_exec() || enable_pipeline_x_exec()) {
+        return;
+    }
+
+    std::lock_guard<std::mutex> lg(_profile_mutex);
+    LOG_INFO("Query {}, register query profile, instance profile count {}", 
print_id(_query_id),
+             _profile_map.size());
+
+    for (auto& [instance_id, instance_profile] : _profile_map) {
+        std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;
+        if (_load_channel_profile_map.contains(instance_id)) {
+            load_channel_profile = _load_channel_profile_map[instance_id];
+        }
+
+        
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_instance_profile(
+                _query_id, this->coord_addr, instance_id, instance_profile, 
load_channel_profile);
+    }
+
+    
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile();
+}
+
+void QueryContext::_report_query_profile_x() {
+    if (!enable_pipeline_x_exec()) {
+        return;
+    }
+
+    std::lock_guard<std::mutex> lg(_profile_mutex);
+    LOG_INFO(
+            "Pipeline x query context, register query profile, query {}, 
fragment profile count {}",
+            print_id(_query_id), _profile_map_x.size());
+
+    for (auto& [fragment_id, fragment_profile] : _profile_map_x) {
+        std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;
+
+        if (_load_channel_profile_map_x.contains(fragment_id)) {
+            load_channel_profile = _load_channel_profile_map_x[fragment_id];
+        }
+
+        
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile_x(
+                _query_id, this->coord_addr, fragment_id, fragment_profile, 
load_channel_profile);
+    }
+
+    
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile();
+}
+
+std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
+QueryContext::_collect_realtime_query_profile_x() const {
+    std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> 
res;
+
+    if (!enable_pipeline_x_exec()) {
+        return res;
+    }
+
+    for (auto& [fragment_id, fragment_ctx_wptr] : 
_fragment_id_to_pipeline_ctx) {
+        if (auto fragment_ctx = fragment_ctx_wptr.lock()) {
+            // In theory, cast result can not be nullptr since we have checked 
the pipeline X engine above
+            std::shared_ptr<pipeline::PipelineXFragmentContext> fragment_ctx_x 
=
+                    
std::dynamic_pointer_cast<pipeline::PipelineXFragmentContext>(fragment_ctx);
+
+            if (fragment_ctx_x == nullptr) {
+                std::string msg =
+                        fmt::format("PipelineXFragmentContext is nullptr, 
query {} fragment_id: {}",
+                                    print_id(_query_id), fragment_id);
+                LOG_ERROR(msg);
+                DCHECK(false) << msg;
+                continue;
+            }
+
+            auto profile = fragment_ctx_x->collect_realtime_profile_x();
+
+            if (profile.empty()) {
+                std::string err_msg = fmt::format(
+                        "Get nothing when collecting profile, query {}, 
fragment_id: {}",
+                        print_id(_query_id), fragment_id);
+                LOG_ERROR(err_msg);
+                DCHECK(false) << err_msg;
+                continue;
+            }
+
+            res.insert(std::make_pair(fragment_id, profile));
+        }
+    }
+
+    return res;
+}
+
+TReportExecStatusParams QueryContext::get_realtime_exec_status_x() const {
+    TReportExecStatusParams exec_status;
+
+    if (enable_pipeline_x_exec()) {
+        auto realtime_query_profile = _collect_realtime_query_profile_x();
+        std::vector<std::shared_ptr<TRuntimeProfileTree>> 
load_channel_profiles;
+
+        for (auto load_channel_profile : _load_channel_profile_map_x) {
+            if (load_channel_profile.second != nullptr) {
+                load_channel_profiles.push_back(load_channel_profile.second);
+            }
+        }
+
+        exec_status = 
RuntimeQueryStatiticsMgr::create_report_exec_status_params_x(
+                this->_query_id, realtime_query_profile, 
load_channel_profiles);
+    } else {
+        auto msg = fmt::format("Query {} is not pipelineX query", 
print_id(_query_id));
+        LOG_ERROR(msg);
+        DCHECK(false) << msg;
+    }
+
+    return exec_status;
+}
+
 } // namespace doris
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index c78886997d0..b9a7e780375 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -18,11 +18,14 @@
 #pragma once
 
 #include <gen_cpp/PaloInternalService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
 #include <gen_cpp/Types_types.h>
 
 #include <atomic>
 #include <memory>
+#include <mutex>
 #include <string>
+#include <unordered_map>
 
 #include "common/config.h"
 #include "common/factory_creator.h"
@@ -32,6 +35,7 @@
 #include "runtime/query_statistics.h"
 #include "runtime/runtime_filter_mgr.h"
 #include "runtime/runtime_predicate.h"
+#include "util/hash_util.hpp"
 #include "util/threadpool.h"
 #include "vec/exec/scan/scanner_scheduler.h"
 #include "vec/runtime/shared_hash_table_controller.h"
@@ -238,6 +242,8 @@ public:
 
     ThreadPool* get_non_pipe_exec_thread_pool();
 
+    std::vector<TUniqueId> get_fragment_instance_ids() const { return 
fragment_instance_ids; }
+
     int64_t mem_limit() const { return _bytes_limit; }
 
     void set_merge_controller_handler(
@@ -351,6 +357,58 @@ private:
     std::mutex _weighted_mem_lock;
     int64_t _weighted_consumption = 0;
     int64_t _weighted_limit = 0;
+
+    std::mutex _profile_mutex;
+
+    // when fragment of pipeline x is closed, it will register its profile to 
this map by using add_fragment_profile_x
+    // flatten profile of one fragment:
+    // Pipeline 0
+    //      PipelineTask 0
+    //              Operator 1
+    //              Operator 2
+    //              Scanner
+    //      PipelineTask 1
+    //              Operator 1
+    //              Operator 2
+    //              Scanner
+    // Pipeline 1
+    //      PipelineTask 2
+    //              Operator 3
+    //      PipelineTask 3
+    //              Operator 3
+    // fragment_id -> list<profile>
+    std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> 
_profile_map_x;
+    std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>> 
_load_channel_profile_map_x;
+
+    // instance_id -> profile
+    std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>> 
_profile_map;
+    std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>> 
_load_channel_profile_map;
+
+    void _report_query_profile();
+    void _report_query_profile_non_pipeline();
+    void _report_query_profile_x();
+
+    std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
+    _collect_realtime_query_profile_x() const;
+
+    std::unordered_map<TUniqueId, 
std::vector<std::shared_ptr<TRuntimeProfileTree>>>
+    _collect_realtime_query_profile_non_pipeline() const;
+
+public:
+    // when fragment of pipeline x is closed, it will register its profile to 
this map by using add_fragment_profile_x
+    void add_fragment_profile_x(
+            int fragment_id,
+            const std::vector<std::shared_ptr<TRuntimeProfileTree>>& 
pipeline_profile,
+            std::shared_ptr<TRuntimeProfileTree> load_channel_profile);
+
+    void add_instance_profile(const TUniqueId& iid, 
std::shared_ptr<TRuntimeProfileTree> profile,
+                              std::shared_ptr<TRuntimeProfileTree> 
load_channel_profile);
+
+    TReportExecStatusParams get_realtime_exec_status_x() const;
+
+    bool enable_profile() const {
+        return _query_options.__isset.enable_profile && 
_query_options.enable_profile;
+    }
 };
 
 } // namespace doris
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 9764b0f0507..a59938b7d51 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -17,13 +17,434 @@
 
 #include "runtime/runtime_query_statistics_mgr.h"
 
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Status_types.h>
+#include <gen_cpp/Types_types.h>
+#include <thrift/TApplicationException.h>
+
+#include <condition_variable>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <random>
+#include <shared_mutex>
+#include <string>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "common/logging.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
+#include "runtime/query_context.h"
+#include "service/backend_options.h"
 #include "util/debug_util.h"
+#include "util/hash_util.hpp"
 #include "util/time.h"
+#include "util/uid_util.h"
 #include "vec/core/block.h"
 
 namespace doris {
+// TODO: Currently this function is only used to report profile.
+// In the future, all exec status and query statistics should be reported
+// thorough this function.
+static Status _do_report_exec_stats_rpc(const TNetworkAddress& coor_addr,
+                                        const TReportExecStatusParams& req,
+                                        TReportExecStatusResult& res) {
+    Status client_status;
+    FrontendServiceConnection 
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr,
+                                         &client_status);
+    if (!client_status.ok()) {
+        LOG_WARNING(
+                "Could not get client rpc client of {} when reporting 
profiles, reason is {}, "
+                "not reporting, profile will be lost",
+                PrintThriftNetworkAddress(coor_addr), 
client_status.to_string());
+        return Status::RpcError("Client rpc client failed");
+    }
+
+    try {
+        try {
+            rpc_client->reportExecStatus(res, req);
+        } catch (const apache::thrift::transport::TTransportException& e) {
+            LOG_WARNING("Transport exception from {}, reason: {}, reopening",
+                        PrintThriftNetworkAddress(coor_addr), e.what());
+            client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
+            if (!client_status.ok()) {
+                LOG_WARNING("Reopen failed, reason: {}", 
client_status.to_string());
+                return Status::RpcError("Open rpc client failed");
+            }
+
+            rpc_client->reportExecStatus(res, req);
+        }
+    } catch (apache::thrift::TApplicationException& e) {
+        if (e.getType() == e.UNKNOWN_METHOD) {
+            LOG_WARNING(
+                    "Failed to report query profile to {} due to {}, usually 
because the frontend "
+                    "is not upgraded, check the version",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        } else {
+            LOG_WARNING(
+                    "Failed to report query profile to {}, reason: {}, you can 
see fe log for "
+                    "details.",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        }
+        return Status::RpcError("Send stats failed");
+    } catch (std::exception& e) {
+        LOG_WARNING(
+                "Failed to report query profile to {}, reason: {}, you can see 
fe log for details.",
+                PrintThriftNetworkAddress(coor_addr), e.what());
+        return Status::RpcError("Send report query profile failed");
+    }
+
+    return Status::OK();
+}
+
+TReportExecStatusParams 
RuntimeQueryStatiticsMgr::create_report_exec_status_params_x(
+        const TUniqueId& query_id,
+        const std::unordered_map<int32, 
std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
+                fragment_id_to_profile,
+        const std::vector<std::shared_ptr<TRuntimeProfileTree>>& 
load_channel_profiles) {
+    TQueryProfile profile;
+    profile.__set_query_id(query_id);
+
+    std::map<int32_t, std::vector<TDetailedReportParams>> 
fragment_id_to_profile_req;
+
+    for (const auto& entry : fragment_id_to_profile) {
+        int32_t fragment_id = entry.first;
+        const std::vector<std::shared_ptr<TRuntimeProfileTree>>& 
fragment_profile = entry.second;
+        std::vector<TDetailedReportParams> detailed_params;
+
+        for (auto pipeline_profile : fragment_profile) {
+            if (pipeline_profile == nullptr) {
+                auto msg = fmt::format("Register fragment profile {} {} 
failed, profile is null",
+                                       print_id(query_id), fragment_id);
+                DCHECK(false) << msg;
+                LOG_ERROR(msg);
+                continue;
+            }
+
+            TDetailedReportParams tmp;
+            tmp.__set_profile(*pipeline_profile);
+            // tmp.fragment_instance_id is not needed for pipeline x
+            detailed_params.push_back(tmp);
+        }
+
+        fragment_id_to_profile_req.insert(std::make_pair(fragment_id, 
detailed_params));
+    }
+
+    if (fragment_id_to_profile_req.size() == 0) {
+        LOG_WARNING("No fragment profile found for query {}", 
print_id(query_id));
+    }
+
+    profile.__set_fragment_id_to_profile(fragment_id_to_profile_req);
+
+    std::vector<TRuntimeProfileTree> load_channel_profiles_req;
+    for (auto load_channel_profile : load_channel_profiles) {
+        if (load_channel_profile == nullptr) {
+            auto msg = fmt::format(
+                    "Register fragment profile {} {} failed, load channel 
profile is null",
+                    print_id(query_id), -1);
+            DCHECK(false) << msg;
+            LOG_ERROR(msg);
+            continue;
+        }
+
+        load_channel_profiles_req.push_back(*load_channel_profile);
+    }
+
+    if (load_channel_profiles_req.size() > 0) {
+        profile.__set_load_channel_profiles(load_channel_profiles_req);
+    }
+
+    TReportExecStatusParams req;
+    req.__set_query_profile(profile);
+    req.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
+    // invalid query id to avoid API compatibility during upgrade
+    req.__set_query_id(TUniqueId());
+
+    return req;
+}
+
+TReportExecStatusParams 
RuntimeQueryStatiticsMgr::create_report_exec_status_params_non_pipeline(
+        const TUniqueId& query_id,
+        const std::unordered_map<TUniqueId, 
std::shared_ptr<TRuntimeProfileTree>>&
+                instance_id_to_profile,
+        const std::vector<std::shared_ptr<TRuntimeProfileTree>>& 
load_channel_profile) {
+    TQueryProfile profile;
+    std::vector<TUniqueId> fragment_instance_ids;
+    std::vector<TRuntimeProfileTree> instance_profiles;
+    std::vector<TRuntimeProfileTree> load_channel_profiles;
+
+    for (auto entry : instance_id_to_profile) {
+        TUniqueId instance_id = entry.first;
+        std::shared_ptr<TRuntimeProfileTree> profile = entry.second;
+
+        if (profile == nullptr) {
+            auto msg = fmt::format("Register instance profile {} {} failed, 
profile is null",
+                                   print_id(query_id), print_id(instance_id));
+            DCHECK(false) << msg;
+            LOG_ERROR(msg);
+            continue;
+        }
+
+        fragment_instance_ids.push_back(instance_id);
+        instance_profiles.push_back(*profile);
+    }
+
+    profile.__set_query_id(query_id);
+    profile.__set_fragment_instance_ids(fragment_instance_ids);
+    profile.__set_instance_profiles(instance_profiles);
+    profile.__set_load_channel_profiles(load_channel_profiles);
+
+    TReportExecStatusParams res;
+    res.__set_query_profile(profile);
+    // invalid query id to avoid API compatibility during upgrade
+    res.__set_query_id(TUniqueId());
+    res.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
+    return res;
+}
+
+void RuntimeQueryStatiticsMgr::start_report_thread() {
+    if (started.load()) {
+        DCHECK(false) << "report thread has been started";
+        LOG_ERROR("report thread has been started");
+        return;
+    }
+
+    started.store(true);
+
+    for (size_t i = 0; i < config::report_exec_status_thread_num; ++i) {
+        
this->_report_profile_threads.emplace_back(std::make_unique<std::thread>(
+                &RuntimeQueryStatiticsMgr::report_query_profiles_thread, 
this));
+    }
+}
+
+void RuntimeQueryStatiticsMgr::report_query_profiles_thread() {
+    while (true) {
+        {
+            std::unique_lock<std::mutex> lock(_report_profile_mutex);
+
+            while (_query_profile_map.empty() && _profile_map_x.empty() &&
+                   !_report_profile_thread_stop) {
+                _report_profile_cv.wait_for(lock, std::chrono::seconds(3));
+            }
+        }
+
+        _report_query_profiles_function();
+
+        {
+            std::lock_guard<std::mutex> lg(_report_profile_mutex);
+
+            if (_report_profile_thread_stop) {
+                LOG_INFO("Report profile thread stopped");
+                return;
+            }
+        }
+    }
+}
+
+void RuntimeQueryStatiticsMgr::trigger_report_profile() {
+    std::unique_lock<std::mutex> lock(_report_profile_mutex);
+    _report_profile_cv.notify_one();
+}
+
+void RuntimeQueryStatiticsMgr::stop_report_thread() {
+    if (!started) {
+        return;
+    }
+
+    {
+        std::unique_lock<std::mutex> lock(_report_profile_mutex);
+        _report_profile_thread_stop = true;
+        LOG_INFO("All report threads are going to stop");
+        _report_profile_cv.notify_all();
+    }
+
+    for (const auto& thread : _report_profile_threads) {
+        thread->join();
+    }
+
+    LOG_INFO("All report threads stopped");
+}
+
+void RuntimeQueryStatiticsMgr::register_instance_profile(
+        const TUniqueId& query_id, const TNetworkAddress& coor_addr, const 
TUniqueId& instance_id,
+        std::shared_ptr<TRuntimeProfileTree> instance_profile,
+        std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
+    if (instance_profile == nullptr) {
+        auto msg = fmt::format("Register instance profile {} {} failed, 
profile is null",
+                               print_id(query_id), print_id(instance_id));
+        DCHECK(false) << msg;
+        LOG_ERROR(msg);
+        return;
+    }
+
+    std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
+
+    if (!_query_profile_map.contains(query_id)) {
+        _query_profile_map[query_id] = std::make_tuple(
+                coor_addr, std::unordered_map<TUniqueId, 
std::shared_ptr<TRuntimeProfileTree>>());
+    }
+
+    std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>& 
instance_profile_map =
+            std::get<1>(_query_profile_map[query_id]);
+    instance_profile_map.insert(std::make_pair(instance_id, instance_profile));
+
+    if (load_channel_profile != nullptr) {
+        _load_channel_profile_map[instance_id] = load_channel_profile;
+    }
+
+    LOG_INFO("Register instance profile {} {}", print_id(query_id), 
print_id(instance_id));
+}
+
+void RuntimeQueryStatiticsMgr::register_fragment_profile_x(
+        const TUniqueId& query_id, const TNetworkAddress& coor_addr, int32_t 
fragment_id,
+        std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles,
+        std::shared_ptr<TRuntimeProfileTree> load_channel_profile_x) {
+    for (const auto& p : p_profiles) {
+        if (p == nullptr) {
+            auto msg = fmt::format("Register fragment profile {} {} failed, 
profile is null",
+                                   print_id(query_id), fragment_id);
+            DCHECK(false) << msg;
+            LOG_ERROR(msg);
+            return;
+        }
+    }
+
+    std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
+
+    if (!_profile_map_x.contains(query_id)) {
+        _profile_map_x[query_id] = std::make_tuple(
+                coor_addr,
+                std::unordered_map<int, 
std::vector<std::shared_ptr<TRuntimeProfileTree>>>());
+    }
+
+    std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
+            fragment_profile_map = std::get<1>(_profile_map_x[query_id]);
+    fragment_profile_map.insert(std::make_pair(fragment_id, p_profiles));
+
+    if (load_channel_profile_x != nullptr) {
+        _load_channel_profile_map_x[std::make_pair(query_id, fragment_id)] = 
load_channel_profile_x;
+    }
+
+    LOG_INFO("register x profile done {}, fragment {}, profiles {}", 
print_id(query_id),
+             fragment_id, p_profiles.size());
+}
+
+void RuntimeQueryStatiticsMgr::_report_query_profiles_non_pipeline() {
+    // query_id -> {coordinator_addr, {instance_id -> instance_profile}}
+    decltype(_query_profile_map) profile_copy;
+    decltype(_load_channel_profile_map) load_channel_profile_copy;
+
+    {
+        std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
+        _query_profile_map.swap(profile_copy);
+        _load_channel_profile_map.swap(load_channel_profile_copy);
+    }
+
+    // query_id -> {coordinator_addr, {instance_id -> instance_profile}}
+    for (const auto& entry : profile_copy) {
+        const auto& query_id = entry.first;
+        const auto& coor_addr = std::get<0>(entry.second);
+        const std::unordered_map<TUniqueId, 
std::shared_ptr<TRuntimeProfileTree>>&
+                instance_id_to_profile = std::get<1>(entry.second);
+
+        for (const auto& profile_entry : instance_id_to_profile) {
+            const auto& instance_id = profile_entry.first;
+            const auto& instance_profile = profile_entry.second;
+
+            if (instance_profile == nullptr) {
+                auto msg = fmt::format("Query {} instance {} profile is null", 
print_id(query_id),
+                                       print_id(instance_id));
+                DCHECK(false) << msg;
+                LOG_ERROR(msg);
+                continue;
+            }
+        }
+
+        std::vector<std::shared_ptr<TRuntimeProfileTree>> 
load_channel_profiles;
+        for (const auto& load_channel_profile : load_channel_profile_copy) {
+            if (load_channel_profile.second == nullptr) {
+                auto msg = fmt::format(
+                        "Register fragment profile {} {} failed, load channel 
profile is null",
+                        print_id(query_id), -1);
+                DCHECK(false) << msg;
+                LOG_ERROR(msg);
+                continue;
+            }
+
+            load_channel_profiles.push_back(load_channel_profile.second);
+        }
+
+        TReportExecStatusParams req = 
create_report_exec_status_params_non_pipeline(
+                query_id, instance_id_to_profile, load_channel_profiles);
+        TReportExecStatusResult res;
+        auto rpc_status = _do_report_exec_stats_rpc(coor_addr, req, res);
+
+        if (res.status.status_code != TStatusCode::OK) {
+            std::stringstream ss;
+            res.status.printTo(ss);
+            LOG_WARNING("Query {} send profile to {} failed, msg: {}", 
print_id(query_id),
+                        PrintThriftNetworkAddress(coor_addr), ss.str());
+        } else {
+            LOG_INFO("Send {} profile finished", print_id(query_id));
+        }
+    }
+}
+
+void RuntimeQueryStatiticsMgr::_report_query_profiles_x() {
+    decltype(_profile_map_x) profile_copy;
+    decltype(_load_channel_profile_map_x) load_channel_profile_copy;
+
+    {
+        std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
+        _profile_map_x.swap(profile_copy);
+        _load_channel_profile_map_x.swap(load_channel_profile_copy);
+    }
+
+    // query_id -> {coordinator_addr, {fragment_id -> 
std::vectpr<pipeline_profile>}}
+    for (const auto& entry : profile_copy) {
+        const auto& query_id = entry.first;
+        const auto& coor_addr = std::get<0>(entry.second);
+        const auto& fragment_profile_map = std::get<1>(entry.second);
+
+        if (fragment_profile_map.empty()) {
+            auto msg = fmt::format("Query {} does not have profile", 
print_id(query_id));
+            DCHECK(false) << msg;
+            LOG_ERROR(msg);
+            continue;
+        }
+
+        std::vector<std::shared_ptr<TRuntimeProfileTree>> 
load_channel_profiles;
+        for (auto load_channel_profile : load_channel_profile_copy) {
+            if (load_channel_profile.second == nullptr) {
+                auto msg = fmt::format(
+                        "Register fragment profile {} {} failed, load channel 
profile is null",
+                        print_id(query_id), -1);
+                DCHECK(false) << msg;
+                LOG_ERROR(msg);
+                continue;
+            }
+
+            load_channel_profiles.push_back(load_channel_profile.second);
+        }
+
+        TReportExecStatusParams req = create_report_exec_status_params_x(
+                query_id, fragment_profile_map, load_channel_profiles);
+        TReportExecStatusResult res;
+
+        auto rpc_status = _do_report_exec_stats_rpc(coor_addr, req, res);
+
+        if (res.status.status_code != TStatusCode::OK ||
+            res.status.status_code != TStatusCode::OK) {
+            LOG_WARNING("Query {} send profile to {} failed", 
print_id(query_id),
+                        PrintThriftNetworkAddress(coor_addr));
+        } else {
+            LOG_INFO("Send {} profile succeed", print_id(query_id));
+        }
+    }
+}
 
 void QueryStatisticsCtx::collect_query_statistics(TQueryStatistics* tq_s) {
     QueryStatistics tmp_qs;
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h 
b/be/src/runtime/runtime_query_statistics_mgr.h
index 1b3e164d48f..088dd39be55 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -18,12 +18,22 @@
 #pragma once
 
 #include <gen_cpp/Data_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Types_types.h>
 
+#include <condition_variable>
+#include <cstdint>
+#include <memory>
+#include <mutex>
 #include <shared_mutex>
 #include <string>
+#include <thread>
+#include <unordered_map>
 
+#include "gutil/integral_types.h"
 #include "runtime/query_statistics.h"
 #include "runtime/workload_management/workload_condition.h"
+#include "util/hash_util.hpp"
 #include "util/time.h"
 
 namespace doris {
@@ -57,6 +67,18 @@ public:
     RuntimeQueryStatiticsMgr() = default;
     ~RuntimeQueryStatiticsMgr() = default;
 
+    static TReportExecStatusParams create_report_exec_status_params_x(
+            const TUniqueId& q_id,
+            const std::unordered_map<int32, 
std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
+                    fragment_id_to_profile,
+            const std::vector<std::shared_ptr<TRuntimeProfileTree>>& 
load_channel_profile);
+
+    static TReportExecStatusParams 
create_report_exec_status_params_non_pipeline(
+            const TUniqueId& q_id,
+            const std::unordered_map<TUniqueId, 
std::shared_ptr<TRuntimeProfileTree>>&
+                    instance_id_to_profile,
+            const std::vector<std::shared_ptr<TRuntimeProfileTree>>& 
load_channel_profile);
+
     void register_query_statistics(std::string query_id, 
std::shared_ptr<QueryStatistics> qs_ptr,
                                    TNetworkAddress fe_addr);
 
@@ -75,9 +97,57 @@ public:
     // used for backend_active_tasks
     void get_active_be_tasks_block(vectorized::Block* block);
 
+    void start_report_thread();
+    void report_query_profiles_thread();
+    void trigger_report_profile();
+    void stop_report_thread();
+
+    void register_instance_profile(const TUniqueId& query_id, const 
TNetworkAddress& coor_addr,
+                                   const TUniqueId& instance_id,
+                                   std::shared_ptr<TRuntimeProfileTree> 
instance_profile,
+                                   std::shared_ptr<TRuntimeProfileTree> 
load_channel_profile);
+
+    void register_fragment_profile_x(const TUniqueId& query_id, const 
TNetworkAddress& const_addr,
+                                     int32_t fragment_id,
+                                     
std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles,
+                                     std::shared_ptr<TRuntimeProfileTree> 
load_channel_profile_x);
+
 private:
     std::shared_mutex _qs_ctx_map_lock;
     std::map<std::string, std::unique_ptr<QueryStatisticsCtx>> 
_query_statistics_ctx_map;
+
+    std::mutex _report_profile_mutex;
+    std::atomic_bool started = false;
+    std::vector<std::unique_ptr<std::thread>> _report_profile_threads;
+    std::condition_variable _report_profile_cv;
+    bool _report_profile_thread_stop = false;
+
+    void _report_query_profiles_function() {
+        _report_query_profiles_x();
+        _report_query_profiles_non_pipeline();
+    }
+
+    void _report_query_profiles_x();
+    void _report_query_profiles_non_pipeline();
+
+    std::shared_mutex _query_profile_map_lock;
+
+    // query_id -> {coordinator_addr, {fragment_id -> 
std::vectpr<pipeline_profile>}}
+    std::unordered_map<
+            TUniqueId,
+            std::tuple<TNetworkAddress,
+                       std::unordered_map<int, 
std::vector<std::shared_ptr<TRuntimeProfileTree>>>>>
+            _profile_map_x;
+    std::unordered_map<std::pair<TUniqueId, int32_t>, 
std::shared_ptr<TRuntimeProfileTree>>
+            _load_channel_profile_map_x;
+
+    // query_id -> {coordinator_addr, {instance_id -> instance_profile}}
+    std::unordered_map<
+            TUniqueId,
+            std::tuple<TNetworkAddress,
+                       std::unordered_map<TUniqueId, 
std::shared_ptr<TRuntimeProfileTree>>>>
+            _query_profile_map;
+    std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>> 
_load_channel_profile_map;
 };
 
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index b1a110144ef..8aab496ed2f 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -23,6 +23,7 @@
 #include <gen_cpp/BackendService_types.h>
 #include <gen_cpp/Data_types.h>
 #include <gen_cpp/DorisExternalService_types.h>
+#include <gen_cpp/FrontendService_types.h>
 #include <gen_cpp/PaloInternalService_types.h>
 #include <gen_cpp/Planner_types.h>
 #include <gen_cpp/Status_types.h>
@@ -1169,4 +1170,29 @@ void 
BaseBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
     response.__set_status(Status::NotSupported("warm_up_tablets is not 
implemented").to_thrift());
 }
 
+void 
BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse& 
response,
+                                                  const 
TGetRealtimeExecStatusRequest& request) {
+    if (!request.__isset.id) {
+        LOG_WARNING("Invalidate argument, id is empty");
+        response.__set_status(Status::InvalidArgument("id is 
empty").to_thrift());
+    }
+
+    LOG_INFO("Getting realtime exec status of query {}", print_id(request.id));
+    std::unique_ptr<TReportExecStatusParams> report_exec_status_params =
+            std::make_unique<TReportExecStatusParams>();
+    Status st = 
ExecEnv::GetInstance()->fragment_mgr()->get_realtime_exec_status(
+            request.id, report_exec_status_params.get());
+
+    if (!st.ok()) {
+        response.__set_status(st.to_thrift());
+        return;
+    }
+
+    report_exec_status_params->__set_query_id(TUniqueId());
+
+    response.__set_status(Status::OK().to_thrift());
+    response.__set_report_exec_status_params(*report_exec_status_params);
+    return;
+}
+
 } // namespace doris
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index 20aaa96685a..b670b21221e 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -137,6 +137,9 @@ public:
     void query_ingest_binlog(TQueryIngestBinlogResult& result,
                              const TQueryIngestBinlogRequest& request) 
override;
 
+    void get_realtime_exec_status(TGetRealtimeExecStatusResponse& response,
+                                  const TGetRealtimeExecStatusRequest& 
request) override;
+
     
////////////////////////////////////////////////////////////////////////////
     // begin cloud backend functions
     
////////////////////////////////////////////////////////////////////////////
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index d1db6ff43e2..bdd51a1a8cc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -19,6 +19,7 @@ package org.apache.doris.common.profile;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.Status;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.common.util.TimeUtils;
@@ -27,7 +28,10 @@ import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TDetailedReportParams;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TQueryProfile;
 import org.apache.doris.thrift.TReportExecStatusParams;
+import org.apache.doris.thrift.TRuntimeProfileTree;
+import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.thrift.TUnit;
 
@@ -245,6 +249,81 @@ public class ExecutionProfile {
         }
     }
 
+    public Status updateProfile(TQueryProfile profile, TNetworkAddress 
backendHBAddress) {
+        if (isPipelineXProfile) {
+            if (!profile.isSetFragmentIdToProfile()) {
+                return new Status(TStatusCode.INVALID_ARGUMENT, 
"FragmentIdToProfile is not set");
+            }
+
+            for (Entry<Integer, List<TDetailedReportParams>> entry : 
profile.getFragmentIdToProfile().entrySet()) {
+                int fragmentId = entry.getKey();
+                List<TDetailedReportParams> fragmentProfile = entry.getValue();
+                int pipelineIdx = 0;
+                List<RuntimeProfile> taskProfile = Lists.newArrayList();
+                for (TDetailedReportParams pipelineProfile : fragmentProfile) {
+                    String name = "Pipeline :" + pipelineIdx + " "
+                            + " (host=" + backendHBAddress + ")";
+                    RuntimeProfile profileNode = new RuntimeProfile(name);
+                    taskProfile.add(profileNode);
+                    if (!pipelineProfile.isSetProfile()) {
+                        return new Status(TStatusCode.INVALID_ARGUMENT, 
"Profile is not set");
+                    }
+
+                    profileNode.update(pipelineProfile.profile);
+                    pipelineIdx++;
+                    fragmentProfiles.get(fragmentId).addChild(profileNode);
+                }
+                multiBeProfile.get(fragmentId).put(backendHBAddress, 
taskProfile);
+            }
+        } else {
+            if (!profile.isSetInstanceProfiles() || 
!profile.isSetFragmentInstanceIds()) {
+                return new Status(TStatusCode.INVALID_ARGUMENT, 
"InstanceIdToProfile is not set");
+            }
+
+            if (profile.fragment_instance_ids.size() != 
profile.instance_profiles.size()) {
+                return new Status(TStatusCode.INVALID_ARGUMENT, 
"InstanceIdToProfile size is not equal");
+            }
+
+            for (int idx = 0; idx < profile.getFragmentInstanceIdsSize(); 
idx++) {
+                TUniqueId instanceId = 
profile.getFragmentInstanceIds().get(idx);
+                TRuntimeProfileTree instanceProfile = 
profile.getInstanceProfiles().get(idx);
+                if (instanceProfile == null) {
+                    return new Status(TStatusCode.INVALID_ARGUMENT, "Profile 
is not set");
+                }
+
+                PlanFragmentId fragmentId = 
instanceIdToFragmentId.get(instanceId);
+                if (fragmentId == null) {
+                    LOG.warn("Could not find related fragment for instance {}",
+                            DebugUtil.printId(instanceId));
+                    return new Status(TStatusCode.INVALID_ARGUMENT, "Could not 
find related fragment");
+                }
+
+                // Do not use fragment id in params, because non-pipeline 
engine will set it to -1
+                Map<TUniqueId, RuntimeProfile> instanceProfiles = 
fragmentInstancesProfiles.get(fragmentId);
+                if (instanceProfiles == null) {
+                    LOG.warn("Could not find related instances for fragment 
{}", fragmentId);
+                    return new Status(TStatusCode.INVALID_ARGUMENT, "Could not 
find related instance");
+                }
+
+                RuntimeProfile curInstanceProfile = 
instanceProfiles.get(instanceId);
+                if (curInstanceProfile == null) {
+                    LOG.warn("Could not find related profile {}", 
DebugUtil.printId(instanceId));
+                    return new Status(TStatusCode.INVALID_ARGUMENT, "Could not 
find related instance");
+                }
+
+                curInstanceProfile.update(instanceProfile);
+            }
+        }
+
+        if (profile.isSetLoadChannelProfiles()) {
+            for (TRuntimeProfileTree loadChannelProfile : 
profile.getLoadChannelProfiles()) {
+                this.loadChannelProfile.update(loadChannelProfile);
+            }
+        }
+
+        return new Status(TStatusCode.OK, "Success");
+    }
+
     public void updateProfile(TReportExecStatusParams params) {
         Backend backend  = null;
         if (params.isSetBackendId()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index a4cd867a31e..eca02175fcf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -19,6 +19,7 @@ package org.apache.doris.qe;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.Status;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.profile.ExecutionProfile;
@@ -26,7 +27,9 @@ import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.ProfileManager;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
+import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TQueryProfile;
 import org.apache.doris.thrift.TQueryType;
 import org.apache.doris.thrift.TReportExecStatusParams;
 import org.apache.doris.thrift.TReportExecStatusResult;
@@ -71,6 +74,27 @@ public final class QeProcessorImpl implements QeProcessor {
                 "profile-write-pool", true);
     }
 
+    private Status processQueryProfile(TQueryProfile profile, TNetworkAddress 
address) {
+        LOG.info("New profile processing API, query {}", 
DebugUtil.printId(profile.query_id));
+
+        ExecutionProfile executionProfile = 
ProfileManager.getInstance().getExecutionProfile(profile.query_id);
+        if (executionProfile == null) {
+            LOG.warn("Could not find execution profile with query id {}", 
DebugUtil.printId(profile.query_id));
+            return new Status(TStatusCode.NOT_FOUND, "Could not find execution 
profile with query id "
+                    + DebugUtil.printId(profile.query_id));
+        }
+
+        // Update profile may cost a lot of time, use a seperate pool to deal 
with it.
+        writeProfileExecutor.submit(new Runnable() {
+            @Override
+            public void run() {
+                executionProfile.updateProfile(profile, address);
+            }
+        });
+
+        return Status.OK;
+    }
+
     @Override
     public Coordinator getCoordinator(TUniqueId queryId) {
         QueryInfo queryInfo = coordinatorMap.get(queryId);
@@ -206,6 +230,15 @@ public final class QeProcessorImpl implements QeProcessor {
 
     @Override
     public TReportExecStatusResult reportExecStatus(TReportExecStatusParams 
params, TNetworkAddress beAddr) {
+        if (params.isSetQueryProfile()) {
+            if (params.isSetBackendId()) {
+                Backend backend = 
Env.getCurrentSystemInfo().getBackend(params.getBackendId());
+                if (backend != null) {
+                    processQueryProfile(params.getQueryProfile(), 
backend.getHeartbeatAddress());
+                }
+            }
+        }
+
         if (params.isSetProfile() || params.isSetLoadChannelProfile()) {
             LOG.info("ReportExecStatus(): fragment_instance_id={}, query 
id={}, backend num: {}, ip: {}",
                     DebugUtil.printId(params.fragment_instance_id), 
DebugUtil.printId(params.query_id),
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
index eb9ac858b3e..81d444a716a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
@@ -31,6 +31,8 @@ import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TExecPlanFragmentResult;
 import org.apache.doris.thrift.TExportStatusResult;
 import org.apache.doris.thrift.TExportTaskRequest;
+import org.apache.doris.thrift.TGetRealtimeExecStatusRequest;
+import org.apache.doris.thrift.TGetRealtimeExecStatusResponse;
 import org.apache.doris.thrift.TGetTopNHotPartitionsRequest;
 import org.apache.doris.thrift.TGetTopNHotPartitionsResponse;
 import org.apache.doris.thrift.TIngestBinlogRequest;
@@ -273,6 +275,12 @@ public class GenericPoolTest {
         public TWarmUpTabletsResponse warmUpTablets(TWarmUpTabletsRequest 
request) throws TException {
             return null;
         }
+
+        @Override
+        public TGetRealtimeExecStatusResponse 
getRealtimeExecStatus(TGetRealtimeExecStatusRequest request)
+                throws TException {
+            return null;
+        }
     }
 
     @Test
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index c705893c672..441595511dc 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -48,6 +48,8 @@ import org.apache.doris.thrift.TExportState;
 import org.apache.doris.thrift.TExportStatusResult;
 import org.apache.doris.thrift.TExportTaskRequest;
 import org.apache.doris.thrift.TFinishTaskRequest;
+import org.apache.doris.thrift.TGetRealtimeExecStatusRequest;
+import org.apache.doris.thrift.TGetRealtimeExecStatusResponse;
 import org.apache.doris.thrift.TGetTopNHotPartitionsRequest;
 import org.apache.doris.thrift.TGetTopNHotPartitionsResponse;
 import org.apache.doris.thrift.THeartbeatResult;
@@ -456,6 +458,12 @@ public class MockedBackendFactory {
                 throws TException {
             return null;
         }
+
+        @Override
+        public TGetRealtimeExecStatusResponse 
getRealtimeExecStatus(TGetRealtimeExecStatusRequest request)
+                throws TException {
+            return null;
+        }
     }
 
     // The default Brpc service.
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index fb45ed24809..69918985a16 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -24,6 +24,7 @@ include "PlanNodes.thrift"
 include "AgentService.thrift"
 include "PaloInternalService.thrift"
 include "DorisExternalService.thrift"
+include "FrontendService.thrift"
 
 struct TExportTaskRequest {
     1: required PaloInternalService.TExecPlanFragmentParams params
@@ -319,6 +320,16 @@ struct TPublishTopicResult {
     1: required Status.TStatus status
 }
 
+struct TGetRealtimeExecStatusRequest {
+    // maybe query id or other unique id
+    1: optional Types.TUniqueId id
+}
+
+struct TGetRealtimeExecStatusResponse {
+    1: optional Status.TStatus status
+    2: optional FrontendService.TReportExecStatusParams 
report_exec_status_params
+}
+
 service BackendService {
     // Called by coord to start asynchronous execution of plan fragment in 
backend.
     // Returns as soon as all incoming data streams have been set up.
@@ -387,4 +398,6 @@ service BackendService {
     TQueryIngestBinlogResult query_ingest_binlog(1: TQueryIngestBinlogRequest 
query_ingest_binlog_request);
 
     TPublishTopicResult publish_topic_info(1:TPublishTopicRequest 
topic_request);
+
+    TGetRealtimeExecStatusResponse 
get_realtime_exec_status(1:TGetRealtimeExecStatusRequest request);
 }
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 61d34d04184..09cd2bf0be2 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -416,6 +416,20 @@ struct TReportWorkloadRuntimeStatusParams {
     2: optional map<string, TQueryStatistics> query_statistics_map
 }
 
+struct TQueryProfile {
+    1: optional Types.TUniqueId query_id
+
+    2: optional map<i32, list<TDetailedReportParams>> fragment_id_to_profile
+
+    // Types.TUniqueId should not be used as key in thrift map, so we use two 
lists instead
+    // https://thrift.apache.org/docs/types#containers
+    3: optional list<Types.TUniqueId> fragment_instance_ids
+    // Types.TUniqueId can not be used as key in thrift map, so we use two 
lists instead
+    4: optional list<RuntimeProfile.TRuntimeProfileTree> instance_profiles
+
+    5: optional list<RuntimeProfile.TRuntimeProfileTree> load_channel_profiles
+}
+
 // The results of an INSERT query, sent to the coordinator as part of
 // TReportExecStatusParams
 struct TReportExecStatusParams {
@@ -443,7 +457,7 @@ struct TReportExecStatusParams {
   // cumulative profile
   // required in V1
   // Move to TDetailedReportParams for pipelineX
-  7: optional RuntimeProfile.TRuntimeProfileTree profile
+  7: optional RuntimeProfile.TRuntimeProfileTree profile // to be deprecated
 
   // New errors that have not been reported to the coordinator
   // optional in V1
@@ -473,17 +487,19 @@ struct TReportExecStatusParams {
   20: optional PaloInternalService.TQueryType query_type
 
   // Move to TDetailedReportParams for pipelineX
-  21: optional RuntimeProfile.TRuntimeProfileTree loadChannelProfile
+  21: optional RuntimeProfile.TRuntimeProfileTree loadChannelProfile // to be 
deprecated
 
   22: optional i32 finished_scan_ranges
 
-  23: optional list<TDetailedReportParams> detailed_report
+  23: optional list<TDetailedReportParams> detailed_report // to be deprecated
 
   24: optional TQueryStatistics query_statistics // deprecated
 
   25: optional TReportWorkloadRuntimeStatusParams 
report_workload_runtime_status
 
   26: optional list<DataSinks.THivePartitionUpdate> hive_partition_updates
+
+  27: optional TQueryProfile query_profile
 }
 
 struct TFeResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to