This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 174eea7246e (chore)[profile] remove non-pipeline logical from profile 
(#35877)
174eea7246e is described below

commit 174eea7246e13ae44f966462b9e93a8c6be5df61
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Wed Jun 5 09:44:54 2024 +0800

    (chore)[profile] remove non-pipeline logical from profile (#35877)
    
    Remove the code for non-pipeline profile processing.
---
 be/src/pipeline/pipeline_fragment_context.cpp      |   4 +-
 be/src/runtime/fragment_mgr.cpp                    |   2 +-
 be/src/runtime/query_context.cpp                   |  69 ++----
 be/src/runtime/query_context.h                     |  26 +--
 be/src/runtime/runtime_query_statistics_mgr.cpp    | 155 +------------
 be/src/runtime/runtime_query_statistics_mgr.h      |  42 +---
 .../doris/common/profile/ExecutionProfile.java     | 250 ++++++---------------
 .../org/apache/doris/common/profile/Profile.java   |   3 -
 .../main/java/org/apache/doris/qe/Coordinator.java |  15 +-
 9 files changed, 109 insertions(+), 457 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index a9db08d0d70..9ddbd1b9150 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1557,8 +1557,8 @@ void PipelineFragmentContext::_close_fragment_instance() {
     }
 
     if (_query_ctx->enable_profile()) {
-        _query_ctx->add_fragment_profile_x(_fragment_id, 
collect_realtime_profile_x(),
-                                           
collect_realtime_load_channel_profile_x());
+        _query_ctx->add_fragment_profile(_fragment_id, 
collect_realtime_profile_x(),
+                                         
collect_realtime_load_channel_profile_x());
     }
 
     // all submitted tasks done
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index fc9fbc9764a..bb3ef4ff0f8 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1261,7 +1261,7 @@ Status FragmentMgr::get_realtime_exec_status(const 
TUniqueId& query_id,
     }
 
     if (query_context->enable_pipeline_x_exec()) {
-        *exec_status = query_context->get_realtime_exec_status_x();
+        *exec_status = query_context->get_realtime_exec_status();
     }
 
     return Status::OK();
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index dcc74c40e1c..a8efd4d9392 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -353,7 +353,7 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& 
tg) {
     return Status::OK();
 }
 
-void QueryContext::add_fragment_profile_x(
+void QueryContext::add_fragment_profile(
         int fragment_id, const 
std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles,
         std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
     if (pipeline_profiles.empty()) {
@@ -375,70 +375,27 @@ void QueryContext::add_fragment_profile_x(
     LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline 
profile count {} ",
              print_id(this->_query_id), fragment_id, pipeline_profiles.size());
 
-    _profile_map_x.insert(std::make_pair(fragment_id, pipeline_profiles));
+    _profile_map.insert(std::make_pair(fragment_id, pipeline_profiles));
 
     if (load_channel_profile != nullptr) {
-        _load_channel_profile_map_x.insert(std::make_pair(fragment_id, 
load_channel_profile));
-    }
-}
-
-void QueryContext::add_instance_profile(const TUniqueId& instance_id,
-                                        std::shared_ptr<TRuntimeProfileTree> 
profile,
-                                        std::shared_ptr<TRuntimeProfileTree> 
load_channel_profile) {
-    DCHECK(profile != nullptr) << print_id(instance_id);
-
-    std::lock_guard<std::mutex> lg(_profile_mutex);
-    _profile_map.insert(std::make_pair(instance_id, profile));
-    if (load_channel_profile != nullptr) {
-        _load_channel_profile_map.insert(std::make_pair(instance_id, 
load_channel_profile));
+        _load_channel_profile_map.insert(std::make_pair(fragment_id, 
load_channel_profile));
     }
 }
 
 void QueryContext::_report_query_profile() {
-    _report_query_profile_x();
-    _report_query_profile_non_pipeline();
-}
-
-void QueryContext::_report_query_profile_non_pipeline() {
-    if (enable_pipeline_x_exec()) {
-        return;
-    }
-
-    std::lock_guard<std::mutex> lg(_profile_mutex);
-    LOG_INFO("Query {}, register query profile, instance profile count {}", 
print_id(_query_id),
-             _profile_map.size());
-
-    for (auto& [instance_id, instance_profile] : _profile_map) {
-        std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;
-        if (_load_channel_profile_map.contains(instance_id)) {
-            load_channel_profile = _load_channel_profile_map[instance_id];
-        }
-
-        
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_instance_profile(
-                _query_id, this->coord_addr, instance_id, instance_profile, 
load_channel_profile);
-    }
-
-    
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile();
-}
-
-void QueryContext::_report_query_profile_x() {
-    if (!enable_pipeline_x_exec()) {
-        return;
-    }
-
     std::lock_guard<std::mutex> lg(_profile_mutex);
     LOG_INFO(
             "Pipeline x query context, register query profile, query {}, 
fragment profile count {}",
-            print_id(_query_id), _profile_map_x.size());
+            print_id(_query_id), _profile_map.size());
 
-    for (auto& [fragment_id, fragment_profile] : _profile_map_x) {
+    for (auto& [fragment_id, fragment_profile] : _profile_map) {
         std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;
 
-        if (_load_channel_profile_map_x.contains(fragment_id)) {
-            load_channel_profile = _load_channel_profile_map_x[fragment_id];
+        if (_load_channel_profile_map.contains(fragment_id)) {
+            load_channel_profile = _load_channel_profile_map[fragment_id];
         }
 
-        
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile_x(
+        
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile(
                 _query_id, this->coord_addr, fragment_id, fragment_profile, 
load_channel_profile);
     }
 
@@ -446,7 +403,7 @@ void QueryContext::_report_query_profile_x() {
 }
 
 std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
-QueryContext::_collect_realtime_query_profile_x() const {
+QueryContext::_collect_realtime_query_profile() const {
     std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> 
res;
 
     if (!enable_pipeline_x_exec()) {
@@ -482,20 +439,20 @@ QueryContext::_collect_realtime_query_profile_x() const {
     return res;
 }
 
-TReportExecStatusParams QueryContext::get_realtime_exec_status_x() const {
+TReportExecStatusParams QueryContext::get_realtime_exec_status() const {
     TReportExecStatusParams exec_status;
 
     if (enable_pipeline_x_exec()) {
-        auto realtime_query_profile = _collect_realtime_query_profile_x();
+        auto realtime_query_profile = _collect_realtime_query_profile();
         std::vector<std::shared_ptr<TRuntimeProfileTree>> 
load_channel_profiles;
 
-        for (auto load_channel_profile : _load_channel_profile_map_x) {
+        for (auto load_channel_profile : _load_channel_profile_map) {
             if (load_channel_profile.second != nullptr) {
                 load_channel_profiles.push_back(load_channel_profile.second);
             }
         }
 
-        exec_status = 
RuntimeQueryStatiticsMgr::create_report_exec_status_params_x(
+        exec_status = 
RuntimeQueryStatiticsMgr::create_report_exec_status_params(
                 this->_query_id, std::move(realtime_query_profile),
                 std::move(load_channel_profiles), /*is_done=*/false);
     } else {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 318afd69187..dc7ea7e29bf 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -322,7 +322,7 @@ private:
 
     std::mutex _profile_mutex;
 
-    // when fragment of pipeline x is closed, it will register its profile to 
this map by using add_fragment_profile_x
+    // when fragment of pipeline is closed, it will register its profile to 
this map by using add_fragment_profile
     // flatten profile of one fragment:
     // Pipeline 0
     //      PipelineTask 0
@@ -339,34 +339,22 @@ private:
     //      PipelineTask 3
     //              Operator 3
     // fragment_id -> list<profile>
-    std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> 
_profile_map_x;
-    std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>> 
_load_channel_profile_map_x;
-
-    // instance_id -> profile
-    std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>> 
_profile_map;
-    std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>> 
_load_channel_profile_map;
+    std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> 
_profile_map;
+    std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>> 
_load_channel_profile_map;
 
     void _report_query_profile();
-    void _report_query_profile_non_pipeline();
-    void _report_query_profile_x();
 
     std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
-    _collect_realtime_query_profile_x() const;
-
-    std::unordered_map<TUniqueId, 
std::vector<std::shared_ptr<TRuntimeProfileTree>>>
-    _collect_realtime_query_profile_non_pipeline() const;
+    _collect_realtime_query_profile() const;
 
 public:
-    // when fragment of pipeline x is closed, it will register its profile to 
this map by using add_fragment_profile_x
-    void add_fragment_profile_x(
+    // when fragment of pipeline is closed, it will register its profile to 
this map by using add_fragment_profile
+    void add_fragment_profile(
             int fragment_id,
             const std::vector<std::shared_ptr<TRuntimeProfileTree>>& 
pipeline_profile,
             std::shared_ptr<TRuntimeProfileTree> load_channel_profile);
 
-    void add_instance_profile(const TUniqueId& iid, 
std::shared_ptr<TRuntimeProfileTree> profile,
-                              std::shared_ptr<TRuntimeProfileTree> 
load_channel_profile);
-
-    TReportExecStatusParams get_realtime_exec_status_x() const;
+    TReportExecStatusParams get_realtime_exec_status() const;
 
     bool enable_profile() const {
         return _query_options.__isset.enable_profile && 
_query_options.enable_profile;
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 3e3dd3de2dd..dda41936284 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -101,7 +101,7 @@ static Status _do_report_exec_stats_rpc(const 
TNetworkAddress& coor_addr,
     return Status::OK();
 }
 
-TReportExecStatusParams 
RuntimeQueryStatiticsMgr::create_report_exec_status_params_x(
+TReportExecStatusParams 
RuntimeQueryStatiticsMgr::create_report_exec_status_params(
         const TUniqueId& query_id,
         std::unordered_map<int32, 
std::vector<std::shared_ptr<TRuntimeProfileTree>>>
                 fragment_id_to_profile,
@@ -169,47 +169,6 @@ TReportExecStatusParams 
RuntimeQueryStatiticsMgr::create_report_exec_status_para
     return req;
 }
 
-TReportExecStatusParams 
RuntimeQueryStatiticsMgr::create_report_exec_status_params_non_pipeline(
-        const TUniqueId& query_id,
-        const std::unordered_map<TUniqueId, 
std::shared_ptr<TRuntimeProfileTree>>&
-                instance_id_to_profile,
-        const std::vector<std::shared_ptr<TRuntimeProfileTree>>& 
load_channel_profile,
-        bool is_done) {
-    TQueryProfile profile;
-    std::vector<TUniqueId> fragment_instance_ids;
-    std::vector<TRuntimeProfileTree> instance_profiles;
-    std::vector<TRuntimeProfileTree> load_channel_profiles;
-
-    for (auto entry : instance_id_to_profile) {
-        TUniqueId instance_id = entry.first;
-        std::shared_ptr<TRuntimeProfileTree> profile = entry.second;
-
-        if (profile == nullptr) {
-            auto msg = fmt::format("Register instance profile {} {} failed, 
profile is null",
-                                   print_id(query_id), print_id(instance_id));
-            DCHECK(false) << msg;
-            LOG_ERROR(msg);
-            continue;
-        }
-
-        fragment_instance_ids.push_back(instance_id);
-        instance_profiles.push_back(*profile);
-    }
-
-    profile.__set_query_id(query_id);
-    profile.__set_fragment_instance_ids(fragment_instance_ids);
-    profile.__set_instance_profiles(instance_profiles);
-    profile.__set_load_channel_profiles(load_channel_profiles);
-
-    TReportExecStatusParams res;
-    res.__set_query_profile(profile);
-    // invalid query id to avoid API compatibility during upgrade
-    res.__set_query_id(TUniqueId());
-    res.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
-    res.__set_done(is_done);
-    return res;
-}
-
 void RuntimeQueryStatiticsMgr::start_report_thread() {
     if (started.load()) {
         DCHECK(false) << "report thread has been started";
@@ -230,8 +189,7 @@ void 
RuntimeQueryStatiticsMgr::report_query_profiles_thread() {
         {
             std::unique_lock<std::mutex> lock(_report_profile_mutex);
 
-            while (_query_profile_map.empty() && _profile_map_x.empty() &&
-                   !_report_profile_thread_stop) {
+            while (_profile_map.empty() && !_report_profile_thread_stop) {
                 _report_profile_cv.wait_for(lock, std::chrono::seconds(3));
             }
         }
@@ -273,37 +231,7 @@ void RuntimeQueryStatiticsMgr::stop_report_thread() {
     LOG_INFO("All report threads stopped");
 }
 
-void RuntimeQueryStatiticsMgr::register_instance_profile(
-        const TUniqueId& query_id, const TNetworkAddress& coor_addr, const 
TUniqueId& instance_id,
-        std::shared_ptr<TRuntimeProfileTree> instance_profile,
-        std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
-    if (instance_profile == nullptr) {
-        auto msg = fmt::format("Register instance profile {} {} failed, 
profile is null",
-                               print_id(query_id), print_id(instance_id));
-        DCHECK(false) << msg;
-        LOG_ERROR(msg);
-        return;
-    }
-
-    std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
-
-    if (!_query_profile_map.contains(query_id)) {
-        _query_profile_map[query_id] = std::make_tuple(
-                coor_addr, std::unordered_map<TUniqueId, 
std::shared_ptr<TRuntimeProfileTree>>());
-    }
-
-    std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>& 
instance_profile_map =
-            std::get<1>(_query_profile_map[query_id]);
-    instance_profile_map.insert(std::make_pair(instance_id, instance_profile));
-
-    if (load_channel_profile != nullptr) {
-        _load_channel_profile_map[instance_id] = load_channel_profile;
-    }
-
-    LOG_INFO("Register instance profile {} {}", print_id(query_id), 
print_id(instance_id));
-}
-
-void RuntimeQueryStatiticsMgr::register_fragment_profile_x(
+void RuntimeQueryStatiticsMgr::register_fragment_profile(
         const TUniqueId& query_id, const TNetworkAddress& coor_addr, int32_t 
fragment_id,
         std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles,
         std::shared_ptr<TRuntimeProfileTree> load_channel_profile_x) {
@@ -319,95 +247,34 @@ void 
RuntimeQueryStatiticsMgr::register_fragment_profile_x(
 
     std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
 
-    if (!_profile_map_x.contains(query_id)) {
-        _profile_map_x[query_id] = std::make_tuple(
+    if (!_profile_map.contains(query_id)) {
+        _profile_map[query_id] = std::make_tuple(
                 coor_addr,
                 std::unordered_map<int, 
std::vector<std::shared_ptr<TRuntimeProfileTree>>>());
     }
 
     std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
-            fragment_profile_map = std::get<1>(_profile_map_x[query_id]);
+            fragment_profile_map = std::get<1>(_profile_map[query_id]);
     fragment_profile_map.insert(std::make_pair(fragment_id, p_profiles));
 
     if (load_channel_profile_x != nullptr) {
-        _load_channel_profile_map_x[std::make_pair(query_id, fragment_id)] = 
load_channel_profile_x;
+        _load_channel_profile_map[std::make_pair(query_id, fragment_id)] = 
load_channel_profile_x;
     }
 
     LOG_INFO("register x profile done {}, fragment {}, profiles {}", 
print_id(query_id),
              fragment_id, p_profiles.size());
 }
 
-void RuntimeQueryStatiticsMgr::_report_query_profiles_non_pipeline() {
-    // query_id -> {coordinator_addr, {instance_id -> instance_profile}}
-    decltype(_query_profile_map) profile_copy;
+void RuntimeQueryStatiticsMgr::_report_query_profiles_function() {
+    decltype(_profile_map) profile_copy;
     decltype(_load_channel_profile_map) load_channel_profile_copy;
 
     {
         std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
-        _query_profile_map.swap(profile_copy);
+        _profile_map.swap(profile_copy);
         _load_channel_profile_map.swap(load_channel_profile_copy);
     }
 
-    // query_id -> {coordinator_addr, {instance_id -> instance_profile}}
-    for (const auto& entry : profile_copy) {
-        const auto& query_id = entry.first;
-        const auto& coor_addr = std::get<0>(entry.second);
-        const std::unordered_map<TUniqueId, 
std::shared_ptr<TRuntimeProfileTree>>&
-                instance_id_to_profile = std::get<1>(entry.second);
-
-        for (const auto& profile_entry : instance_id_to_profile) {
-            const auto& instance_id = profile_entry.first;
-            const auto& instance_profile = profile_entry.second;
-
-            if (instance_profile == nullptr) {
-                auto msg = fmt::format("Query {} instance {} profile is null", 
print_id(query_id),
-                                       print_id(instance_id));
-                DCHECK(false) << msg;
-                LOG_ERROR(msg);
-                continue;
-            }
-        }
-
-        std::vector<std::shared_ptr<TRuntimeProfileTree>> 
load_channel_profiles;
-        for (const auto& load_channel_profile : load_channel_profile_copy) {
-            if (load_channel_profile.second == nullptr) {
-                auto msg = fmt::format(
-                        "Register fragment profile {} {} failed, load channel 
profile is null",
-                        print_id(query_id), -1);
-                DCHECK(false) << msg;
-                LOG_ERROR(msg);
-                continue;
-            }
-
-            load_channel_profiles.push_back(load_channel_profile.second);
-        }
-
-        TReportExecStatusParams req = 
create_report_exec_status_params_non_pipeline(
-                query_id, instance_id_to_profile, load_channel_profiles, 
/*is_done=*/true);
-        TReportExecStatusResult res;
-        auto rpc_status = _do_report_exec_stats_rpc(coor_addr, req, res);
-
-        if (res.status.status_code != TStatusCode::OK) {
-            std::stringstream ss;
-            res.status.printTo(ss);
-            LOG_WARNING("Query {} send profile to {} failed, msg: {}", 
print_id(query_id),
-                        PrintThriftNetworkAddress(coor_addr), ss.str());
-        } else {
-            LOG_INFO("Send {} profile finished", print_id(query_id));
-        }
-    }
-}
-
-void RuntimeQueryStatiticsMgr::_report_query_profiles_x() {
-    decltype(_profile_map_x) profile_copy;
-    decltype(_load_channel_profile_map_x) load_channel_profile_copy;
-
-    {
-        std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
-        _profile_map_x.swap(profile_copy);
-        _load_channel_profile_map_x.swap(load_channel_profile_copy);
-    }
-
     // query_id -> {coordinator_addr, {fragment_id -> 
std::vectpr<pipeline_profile>}}
     for (auto& entry : profile_copy) {
         const auto& query_id = entry.first;
@@ -435,7 +302,7 @@ void RuntimeQueryStatiticsMgr::_report_query_profiles_x() {
             load_channel_profiles.push_back(load_channel_profile.second);
         }
 
-        TReportExecStatusParams req = create_report_exec_status_params_x(
+        TReportExecStatusParams req = create_report_exec_status_params(
                 query_id, std::move(fragment_profile_map), 
std::move(load_channel_profiles),
                 /*is_done=*/true);
         TReportExecStatusResult res;
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h 
b/be/src/runtime/runtime_query_statistics_mgr.h
index ff61f665342..5fb2332c335 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -69,19 +69,12 @@ public:
     RuntimeQueryStatiticsMgr() = default;
     ~RuntimeQueryStatiticsMgr() = default;
 
-    static TReportExecStatusParams create_report_exec_status_params_x(
+    static TReportExecStatusParams create_report_exec_status_params(
             const TUniqueId& q_id,
             std::unordered_map<int32, 
std::vector<std::shared_ptr<TRuntimeProfileTree>>>
                     fragment_id_to_profile,
             std::vector<std::shared_ptr<TRuntimeProfileTree>> 
load_channel_profile, bool is_done);
 
-    static TReportExecStatusParams 
create_report_exec_status_params_non_pipeline(
-            const TUniqueId& q_id,
-            const std::unordered_map<TUniqueId, 
std::shared_ptr<TRuntimeProfileTree>>&
-                    instance_id_to_profile,
-            const std::vector<std::shared_ptr<TRuntimeProfileTree>>& 
load_channel_profile,
-            bool is_done);
-
     void register_query_statistics(std::string query_id, 
std::shared_ptr<QueryStatistics> qs_ptr,
                                    TNetworkAddress fe_addr, TQueryType::type 
query_type);
 
@@ -105,15 +98,10 @@ public:
     void trigger_report_profile();
     void stop_report_thread();
 
-    void register_instance_profile(const TUniqueId& query_id, const 
TNetworkAddress& coor_addr,
-                                   const TUniqueId& instance_id,
-                                   std::shared_ptr<TRuntimeProfileTree> 
instance_profile,
-                                   std::shared_ptr<TRuntimeProfileTree> 
load_channel_profile);
-
-    void register_fragment_profile_x(const TUniqueId& query_id, const 
TNetworkAddress& const_addr,
-                                     int32_t fragment_id,
-                                     
std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles,
-                                     std::shared_ptr<TRuntimeProfileTree> 
load_channel_profile_x);
+    void register_fragment_profile(const TUniqueId& query_id, const 
TNetworkAddress& const_addr,
+                                   int32_t fragment_id,
+                                   
std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles,
+                                   std::shared_ptr<TRuntimeProfileTree> 
load_channel_profile_x);
 
 private:
     std::shared_mutex _qs_ctx_map_lock;
@@ -125,13 +113,7 @@ private:
     std::condition_variable _report_profile_cv;
     bool _report_profile_thread_stop = false;
 
-    void _report_query_profiles_function() {
-        _report_query_profiles_x();
-        _report_query_profiles_non_pipeline();
-    }
-
-    void _report_query_profiles_x();
-    void _report_query_profiles_non_pipeline();
+    void _report_query_profiles_function();
 
     std::shared_mutex _query_profile_map_lock;
 
@@ -140,17 +122,9 @@ private:
             TUniqueId,
             std::tuple<TNetworkAddress,
                        std::unordered_map<int, 
std::vector<std::shared_ptr<TRuntimeProfileTree>>>>>
-            _profile_map_x;
+            _profile_map;
     std::unordered_map<std::pair<TUniqueId, int32_t>, 
std::shared_ptr<TRuntimeProfileTree>>
-            _load_channel_profile_map_x;
-
-    // query_id -> {coordinator_addr, {instance_id -> instance_profile}}
-    std::unordered_map<
-            TUniqueId,
-            std::tuple<TNetworkAddress,
-                       std::unordered_map<TUniqueId, 
std::shared_ptr<TRuntimeProfileTree>>>>
-            _query_profile_map;
-    std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>> 
_load_channel_profile_map;
+            _load_channel_profile_map;
 };
 
 } // namespace doris
\ No newline at end of file
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index e8b450b530c..9c08ab343d6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -78,7 +78,6 @@ public class ExecutionProfile {
     private RuntimeProfile loadChannelProfile;
     // FragmentId -> InstanceId -> RuntimeProfile
     private Map<PlanFragmentId, Map<TUniqueId, RuntimeProfile>> 
fragmentInstancesProfiles;
-    private boolean isPipelineXProfile = false;
 
     // use to merge profile from multi be
     private Map<Integer, Map<TNetworkAddress, List<RuntimeProfile>>> 
multiBeProfile = null;
@@ -138,7 +137,7 @@ public class ExecutionProfile {
         return allPipelines;
     }
 
-    private RuntimeProfile getPipelineXAggregatedProfile(Map<Integer, String> 
planNodeMap) {
+    private RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String> 
planNodeMap) {
         RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
         for (int i = 0; i < fragmentProfiles.size(); ++i) {
             RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " 
+ i);
@@ -158,80 +157,34 @@ public class ExecutionProfile {
         return fragmentsProfile;
     }
 
-    private RuntimeProfile getNonPipelineXAggregatedProfile(Map<Integer, 
String> planNodeMap) {
-        RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
-        for (int i = 0; i < fragmentProfiles.size(); ++i) {
-            RuntimeProfile oldFragmentProfile = 
fragmentProfiles.get(seqNoToFragmentId.get(i));
-            RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " 
+ i);
-            fragmentsProfile.addChild(newFragmentProfile);
-            List<RuntimeProfile> allInstanceProfiles = new 
ArrayList<RuntimeProfile>();
-            for (Pair<RuntimeProfile, Boolean> runtimeProfile : 
oldFragmentProfile.getChildList()) {
-                allInstanceProfiles.add(runtimeProfile.first);
-            }
-            RuntimeProfile mergedInstanceProfile = new 
RuntimeProfile("Instance" + "(instance_num="
-                    + allInstanceProfiles.size() + ")", 
allInstanceProfiles.get(0).nodeId());
-            newFragmentProfile.addChild(mergedInstanceProfile);
-            RuntimeProfile.mergeProfiles(allInstanceProfiles, 
mergedInstanceProfile, planNodeMap);
-        }
-        return fragmentsProfile;
-    }
-
     public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String> 
planNodeMap) {
-        if (isPipelineXProfile) {
-            /*
-             * Fragment 0
-             * ---Pipeline 0
-             * ------pipelineTask 0
-             * ------pipelineTask 0
-             * ------pipelineTask 0
-             * ---Pipeline 1
-             * ------pipelineTask 1
-             * ---Pipeline 2
-             * ------pipelineTask 2
-             * ------pipelineTask 2
-             * Fragment 1
-             * ---Pipeline 0
-             * ------......
-             * ---Pipeline 1
-             * ------......
-             * ---Pipeline 2
-             * ------......
-             * ......
-             */
-            return getPipelineXAggregatedProfile(planNodeMap);
-        } else {
-            /*
-             * Fragment 0
-             * ---Instance 0
-             * ------pipelineTask 0
-             * ------pipelineTask 1
-             * ------pipelineTask 2
-             * ---Instance 1
-             * ------pipelineTask 0
-             * ------pipelineTask 1
-             * ------pipelineTask 2
-             * ---Instance 2
-             * ------pipelineTask 0
-             * ------pipelineTask 1
-             * ------pipelineTask 2
-             * Fragment 1
-             * ---Instance 0
-             * ---Instance 1
-             * ---Instance 2
-             * ......
-             */
-            return getNonPipelineXAggregatedProfile(planNodeMap);
-        }
+        /*
+            * Fragment 0
+            * ---Pipeline 0
+            * ------pipelineTask 0
+            * ------pipelineTask 0
+            * ------pipelineTask 0
+            * ---Pipeline 1
+            * ------pipelineTask 1
+            * ---Pipeline 2
+            * ------pipelineTask 2
+            * ------pipelineTask 2
+            * Fragment 1
+            * ---Pipeline 0
+            * ------......
+            * ---Pipeline 1
+            * ------......
+            * ---Pipeline 2
+            * ------......
+            * ......
+            */
+        return getPipelineAggregatedProfile(planNodeMap);
     }
 
     public RuntimeProfile getRoot() {
         return root;
     }
 
-    public void setPipelineX() {
-        this.isPipelineXProfile = true;
-    }
-
     // The execution profile is maintained in ProfileManager, if it is 
finished, then should
     // remove it from it as soon as possible.
     public void update(long startTime, boolean isFinished) {
@@ -255,76 +208,32 @@ public class ExecutionProfile {
             return new Status(TStatusCode.INVALID_ARGUMENT, "QueryId is not 
set");
         }
 
-        if (isPipelineXProfile) {
-            if (!profile.isSetFragmentIdToProfile()) {
-                LOG.warn("{} FragmentIdToProfile is not set", 
DebugUtil.printId(profile.getQueryId()));
-                return new Status(TStatusCode.INVALID_ARGUMENT, 
"FragmentIdToProfile is not set");
-            }
-
-            for (Entry<Integer, List<TDetailedReportParams>> entry : 
profile.getFragmentIdToProfile().entrySet()) {
-                int fragmentId = entry.getKey();
-                List<TDetailedReportParams> fragmentProfile = entry.getValue();
-                int pipelineIdx = 0;
-                List<RuntimeProfile> taskProfile = Lists.newArrayList();
-                for (TDetailedReportParams pipelineProfile : fragmentProfile) {
-                    String name = "Pipeline :" + pipelineIdx + " "
-                            + " (host=" + backendHBAddress + ")";
-                    RuntimeProfile profileNode = new RuntimeProfile(name);
-                    taskProfile.add(profileNode);
-                    if (!pipelineProfile.isSetProfile()) {
-                        LOG.warn("Profile is not set, {}", 
DebugUtil.printId(profile.getQueryId()));
-                        return new Status(TStatusCode.INVALID_ARGUMENT, 
"Profile is not set");
-                    }
-
-                    profileNode.update(pipelineProfile.profile);
-                    profileNode.setIsDone(isDone);
-                    pipelineIdx++;
-                    fragmentProfiles.get(fragmentId).addChild(profileNode);
-                }
-                multiBeProfile.get(fragmentId).put(backendHBAddress, 
taskProfile);
-            }
-        } else {
-            if (!profile.isSetInstanceProfiles() || 
!profile.isSetFragmentInstanceIds()) {
-                LOG.warn("InstanceIdToProfile is not set, {}", 
DebugUtil.printId(profile.getQueryId()));
-                return new Status(TStatusCode.INVALID_ARGUMENT, 
"InstanceIdToProfile is not set");
-            }
-
-            if (profile.fragment_instance_ids.size() != 
profile.instance_profiles.size()) {
-                LOG.warn("InstanceIdToProfile size is not equal, {}",
-                        DebugUtil.printId(profile.getQueryId()));
-                return new Status(TStatusCode.INVALID_ARGUMENT, 
"InstanceIdToProfile size is not equal");
-            }
+        if (!profile.isSetFragmentIdToProfile()) {
+            LOG.warn("{} FragmentIdToProfile is not set", 
DebugUtil.printId(profile.getQueryId()));
+            return new Status(TStatusCode.INVALID_ARGUMENT, 
"FragmentIdToProfile is not set");
+        }
 
-            for (int idx = 0; idx < profile.getFragmentInstanceIdsSize(); 
idx++) {
-                TUniqueId instanceId = 
profile.getFragmentInstanceIds().get(idx);
-                TRuntimeProfileTree instanceProfile = 
profile.getInstanceProfiles().get(idx);
-                if (instanceProfile == null) {
-                    LOG.warn("Profile is not set {}", 
DebugUtil.printId(profile.getQueryId()));
+        for (Entry<Integer, List<TDetailedReportParams>> entry : 
profile.getFragmentIdToProfile().entrySet()) {
+            int fragmentId = entry.getKey();
+            List<TDetailedReportParams> fragmentProfile = entry.getValue();
+            int pipelineIdx = 0;
+            List<RuntimeProfile> taskProfile = Lists.newArrayList();
+            for (TDetailedReportParams pipelineProfile : fragmentProfile) {
+                String name = "Pipeline :" + pipelineIdx + " "
+                        + " (host=" + backendHBAddress + ")";
+                RuntimeProfile profileNode = new RuntimeProfile(name);
+                taskProfile.add(profileNode);
+                if (!pipelineProfile.isSetProfile()) {
+                    LOG.warn("Profile is not set, {}", 
DebugUtil.printId(profile.getQueryId()));
                     return new Status(TStatusCode.INVALID_ARGUMENT, "Profile 
is not set");
                 }
 
-                PlanFragmentId fragmentId = 
instanceIdToFragmentId.get(instanceId);
-                if (fragmentId == null) {
-                    LOG.warn("Could not find related fragment for instance {}",
-                            DebugUtil.printId(instanceId));
-                    return new Status(TStatusCode.INVALID_ARGUMENT, "Could not 
find related fragment");
-                }
-
-                // Do not use fragment id in params, because non-pipeline 
engine will set it to -1
-                Map<TUniqueId, RuntimeProfile> instanceProfiles = 
fragmentInstancesProfiles.get(fragmentId);
-                if (instanceProfiles == null) {
-                    LOG.warn("Could not find related instances for fragment 
{}", fragmentId);
-                    return new Status(TStatusCode.INVALID_ARGUMENT, "Could not 
find related instance");
-                }
-
-                RuntimeProfile curInstanceProfile = 
instanceProfiles.get(instanceId);
-                if (curInstanceProfile == null) {
-                    LOG.warn("Could not find related profile {}", 
DebugUtil.printId(instanceId));
-                    return new Status(TStatusCode.INVALID_ARGUMENT, "Could not 
find related instance");
-                }
-                curInstanceProfile.setIsDone(isDone);
-                curInstanceProfile.update(instanceProfile);
+                profileNode.update(pipelineProfile.profile);
+                profileNode.setIsDone(isDone);
+                pipelineIdx++;
+                fragmentProfiles.get(fragmentId).addChild(profileNode);
             }
+            multiBeProfile.get(fragmentId).put(backendHBAddress, taskProfile);
         }
 
         if (profile.isSetLoadChannelProfiles()) {
@@ -348,59 +257,32 @@ public class ExecutionProfile {
             LOG.warn("backend id is not set in report profile request, bad 
message");
             return;
         }
-        if (isPipelineXProfile) {
-            int pipelineIdx = 0;
-            List<RuntimeProfile> taskProfile = Lists.newArrayList();
-            String suffix = " (host=" + backend.getHeartbeatAddress() + ")";
-            for (TDetailedReportParams param : params.detailed_report) {
-                String name = param.isSetIsFragmentLevel() && 
param.is_fragment_level ? "Fragment Level Profile: "
-                        + suffix : "Pipeline :" + pipelineIdx + " " + suffix;
-                RuntimeProfile profile = new RuntimeProfile(name);
-                taskProfile.add(profile);
-                if (param.isSetProfile()) {
-                    profile.update(param.profile);
-                }
-                if (params.done) {
-                    profile.setIsDone(true);
-                }
-                pipelineIdx++;
-                profile.sortChildren();
-                fragmentProfiles.get(params.fragment_id).addChild(profile);
-            }
-            // TODO ygl: is this right? there maybe multi Backends, what does
-            // update load profile do???
-            if (params.isSetLoadChannelProfile()) {
-                loadChannelProfile.update(params.loadChannelProfile);
-            }
-            
multiBeProfile.get(params.fragment_id).put(backend.getHeartbeatAddress(), 
taskProfile);
-        } else {
-            PlanFragmentId fragmentId = 
instanceIdToFragmentId.get(params.fragment_instance_id);
-            if (fragmentId == null) {
-                LOG.warn("Could not find related fragment for instance {}",
-                        DebugUtil.printId(params.fragment_instance_id));
-                return;
-            }
-            // Do not use fragment id in params, because non-pipeline engine 
will set it to -1
-            Map<TUniqueId, RuntimeProfile> instanceProfiles = 
fragmentInstancesProfiles.get(fragmentId);
-            if (instanceProfiles == null) {
-                LOG.warn("Could not find related instances for fragment {}", 
fragmentId);
-                return;
-            }
-            RuntimeProfile instanceProfile = 
instanceProfiles.get(params.fragment_instance_id);
-            if (instanceProfile == null) {
-                LOG.warn("Could not find related instance {}", 
params.fragment_instance_id);
-                return;
-            }
-            if (params.isSetProfile()) {
-                instanceProfile.update(params.profile);
-            }
-            if (params.isSetDone() && params.isDone()) {
-                instanceProfile.setIsDone(true);
+
+        int pipelineIdx = 0;
+        List<RuntimeProfile> taskProfile = Lists.newArrayList();
+        String suffix = " (host=" + backend.getHeartbeatAddress() + ")";
+        for (TDetailedReportParams param : params.detailed_report) {
+            String name = param.isSetIsFragmentLevel() && 
param.is_fragment_level ? "Fragment Level Profile: "
+                    + suffix : "Pipeline :" + pipelineIdx + " " + suffix;
+            RuntimeProfile profile = new RuntimeProfile(name);
+            taskProfile.add(profile);
+            if (param.isSetProfile()) {
+                profile.update(param.profile);
             }
-            if (params.isSetLoadChannelProfile()) {
-                loadChannelProfile.update(params.loadChannelProfile);
+            if (params.done) {
+                profile.setIsDone(true);
             }
+            pipelineIdx++;
+            profile.sortChildren();
+            fragmentProfiles.get(params.fragment_id).addChild(profile);
+        }
+        // TODO ygl: is this right? there maybe multi Backends, what does
+        // update load profile do???
+        if (params.isSetLoadChannelProfile()) {
+            loadChannelProfile.update(params.loadChannelProfile);
         }
+
+        
multiBeProfile.get(params.fragment_id).put(backend.getHeartbeatAddress(), 
taskProfile);
     }
 
     // MultiInstances may update the profile concurrently
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
index c0d3614550d..12b3b903880 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
@@ -79,9 +79,6 @@ public class Profile {
             LOG.warn("try to set a null excecution profile, it is abnormal", 
new Exception());
             return;
         }
-        if (this.isPipelineX) {
-            executionProfile.setPipelineX();
-        }
         executionProfile.setSummaryProfile(summaryProfile);
         this.executionProfiles.add(executionProfile);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 265d3afdb35..94e7d59625a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -387,10 +387,6 @@ public class Coordinator implements CoordInterface {
         // 
https://github.com/apache/doris/blob/bd6f5b6a0e5f1b12744607336123d7f97eb76af9/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java#L155
         this.enablePipelineEngine = Config.enable_pipeline_load;
         this.enablePipelineXEngine = Config.enable_pipeline_load;
-        // make sure Coordinator can update profile correctlly
-        if (this.enablePipelineXEngine) {
-            this.executionProfile.setPipelineX();
-        }
     }
 
     private void setFromUserProperty(ConnectContext connectContext) {
@@ -3248,16 +3244,7 @@ public class Coordinator implements CoordInterface {
 
             this.lastMissingHeartbeatTime = 
backend.getLastMissingHeartbeatTime();
             this.enablePipelineX = enablePipelineX;
-            if (this.enablePipelineX) {
-                executionProfile.addFragmentBackend(fragmentId, backendId);
-            } else {
-                for (TPipelineInstanceParams instanceParam : 
rpcParams.local_params) {
-                    String profileName = "Instance " + 
DebugUtil.printId(instanceParam.fragment_instance_id)
-                            + " (host=" + address + ")";
-                    executionProfile.addInstanceProfile(fragmentId, 
instanceParam.fragment_instance_id,
-                        new RuntimeProfile(profileName));
-                }
-            }
+            executionProfile.addFragmentBackend(fragmentId, backendId);
         }
 
         /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to