This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 174eea7246e (chore)[profile] remove non-pipeline logical from profile (#35877) 174eea7246e is described below commit 174eea7246e13ae44f966462b9e93a8c6be5df61 Author: zhiqiang <seuhezhiqi...@163.com> AuthorDate: Wed Jun 5 09:44:54 2024 +0800 (chore)[profile] remove non-pipeline logical from profile (#35877) Remove the code for non-pipeline profile processing. --- be/src/pipeline/pipeline_fragment_context.cpp | 4 +- be/src/runtime/fragment_mgr.cpp | 2 +- be/src/runtime/query_context.cpp | 69 ++---- be/src/runtime/query_context.h | 26 +-- be/src/runtime/runtime_query_statistics_mgr.cpp | 155 +------------ be/src/runtime/runtime_query_statistics_mgr.h | 42 +--- .../doris/common/profile/ExecutionProfile.java | 250 ++++++--------------- .../org/apache/doris/common/profile/Profile.java | 3 - .../main/java/org/apache/doris/qe/Coordinator.java | 15 +- 9 files changed, 109 insertions(+), 457 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index a9db08d0d70..9ddbd1b9150 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1557,8 +1557,8 @@ void PipelineFragmentContext::_close_fragment_instance() { } if (_query_ctx->enable_profile()) { - _query_ctx->add_fragment_profile_x(_fragment_id, collect_realtime_profile_x(), - collect_realtime_load_channel_profile_x()); + _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile_x(), + collect_realtime_load_channel_profile_x()); } // all submitted tasks done diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index fc9fbc9764a..bb3ef4ff0f8 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1261,7 +1261,7 @@ Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id, } if (query_context->enable_pipeline_x_exec()) { - *exec_status = query_context->get_realtime_exec_status_x(); + *exec_status = query_context->get_realtime_exec_status(); } return Status::OK(); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index dcc74c40e1c..a8efd4d9392 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -353,7 +353,7 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& tg) { return Status::OK(); } -void QueryContext::add_fragment_profile_x( +void QueryContext::add_fragment_profile( int fragment_id, const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles, std::shared_ptr<TRuntimeProfileTree> load_channel_profile) { if (pipeline_profiles.empty()) { @@ -375,70 +375,27 @@ void QueryContext::add_fragment_profile_x( LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline profile count {} ", print_id(this->_query_id), fragment_id, pipeline_profiles.size()); - _profile_map_x.insert(std::make_pair(fragment_id, pipeline_profiles)); + _profile_map.insert(std::make_pair(fragment_id, pipeline_profiles)); if (load_channel_profile != nullptr) { - _load_channel_profile_map_x.insert(std::make_pair(fragment_id, load_channel_profile)); - } -} - -void QueryContext::add_instance_profile(const TUniqueId& instance_id, - std::shared_ptr<TRuntimeProfileTree> profile, - std::shared_ptr<TRuntimeProfileTree> load_channel_profile) { - DCHECK(profile != nullptr) << print_id(instance_id); - - std::lock_guard<std::mutex> lg(_profile_mutex); - _profile_map.insert(std::make_pair(instance_id, profile)); - if (load_channel_profile != nullptr) { - _load_channel_profile_map.insert(std::make_pair(instance_id, load_channel_profile)); + _load_channel_profile_map.insert(std::make_pair(fragment_id, load_channel_profile)); } } void QueryContext::_report_query_profile() { - _report_query_profile_x(); - _report_query_profile_non_pipeline(); -} - -void QueryContext::_report_query_profile_non_pipeline() { - if (enable_pipeline_x_exec()) { - return; - } - - std::lock_guard<std::mutex> lg(_profile_mutex); - LOG_INFO("Query {}, register query profile, instance profile count {}", print_id(_query_id), - _profile_map.size()); - - for (auto& [instance_id, instance_profile] : _profile_map) { - std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr; - if (_load_channel_profile_map.contains(instance_id)) { - load_channel_profile = _load_channel_profile_map[instance_id]; - } - - ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_instance_profile( - _query_id, this->coord_addr, instance_id, instance_profile, load_channel_profile); - } - - ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile(); -} - -void QueryContext::_report_query_profile_x() { - if (!enable_pipeline_x_exec()) { - return; - } - std::lock_guard<std::mutex> lg(_profile_mutex); LOG_INFO( "Pipeline x query context, register query profile, query {}, fragment profile count {}", - print_id(_query_id), _profile_map_x.size()); + print_id(_query_id), _profile_map.size()); - for (auto& [fragment_id, fragment_profile] : _profile_map_x) { + for (auto& [fragment_id, fragment_profile] : _profile_map) { std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr; - if (_load_channel_profile_map_x.contains(fragment_id)) { - load_channel_profile = _load_channel_profile_map_x[fragment_id]; + if (_load_channel_profile_map.contains(fragment_id)) { + load_channel_profile = _load_channel_profile_map[fragment_id]; } - ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile_x( + ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile( _query_id, this->coord_addr, fragment_id, fragment_profile, load_channel_profile); } @@ -446,7 +403,7 @@ void QueryContext::_report_query_profile_x() { } std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> -QueryContext::_collect_realtime_query_profile_x() const { +QueryContext::_collect_realtime_query_profile() const { std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> res; if (!enable_pipeline_x_exec()) { @@ -482,20 +439,20 @@ QueryContext::_collect_realtime_query_profile_x() const { return res; } -TReportExecStatusParams QueryContext::get_realtime_exec_status_x() const { +TReportExecStatusParams QueryContext::get_realtime_exec_status() const { TReportExecStatusParams exec_status; if (enable_pipeline_x_exec()) { - auto realtime_query_profile = _collect_realtime_query_profile_x(); + auto realtime_query_profile = _collect_realtime_query_profile(); std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles; - for (auto load_channel_profile : _load_channel_profile_map_x) { + for (auto load_channel_profile : _load_channel_profile_map) { if (load_channel_profile.second != nullptr) { load_channel_profiles.push_back(load_channel_profile.second); } } - exec_status = RuntimeQueryStatiticsMgr::create_report_exec_status_params_x( + exec_status = RuntimeQueryStatiticsMgr::create_report_exec_status_params( this->_query_id, std::move(realtime_query_profile), std::move(load_channel_profiles), /*is_done=*/false); } else { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 318afd69187..dc7ea7e29bf 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -322,7 +322,7 @@ private: std::mutex _profile_mutex; - // when fragment of pipeline x is closed, it will register its profile to this map by using add_fragment_profile_x + // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile // flatten profile of one fragment: // Pipeline 0 // PipelineTask 0 @@ -339,34 +339,22 @@ private: // PipelineTask 3 // Operator 3 // fragment_id -> list<profile> - std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> _profile_map_x; - std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>> _load_channel_profile_map_x; - - // instance_id -> profile - std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>> _profile_map; - std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>> _load_channel_profile_map; + std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> _profile_map; + std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>> _load_channel_profile_map; void _report_query_profile(); - void _report_query_profile_non_pipeline(); - void _report_query_profile_x(); std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> - _collect_realtime_query_profile_x() const; - - std::unordered_map<TUniqueId, std::vector<std::shared_ptr<TRuntimeProfileTree>>> - _collect_realtime_query_profile_non_pipeline() const; + _collect_realtime_query_profile() const; public: - // when fragment of pipeline x is closed, it will register its profile to this map by using add_fragment_profile_x - void add_fragment_profile_x( + // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile + void add_fragment_profile( int fragment_id, const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profile, std::shared_ptr<TRuntimeProfileTree> load_channel_profile); - void add_instance_profile(const TUniqueId& iid, std::shared_ptr<TRuntimeProfileTree> profile, - std::shared_ptr<TRuntimeProfileTree> load_channel_profile); - - TReportExecStatusParams get_realtime_exec_status_x() const; + TReportExecStatusParams get_realtime_exec_status() const; bool enable_profile() const { return _query_options.__isset.enable_profile && _query_options.enable_profile; diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 3e3dd3de2dd..dda41936284 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -101,7 +101,7 @@ static Status _do_report_exec_stats_rpc(const TNetworkAddress& coor_addr, return Status::OK(); } -TReportExecStatusParams RuntimeQueryStatiticsMgr::create_report_exec_status_params_x( +TReportExecStatusParams RuntimeQueryStatiticsMgr::create_report_exec_status_params( const TUniqueId& query_id, std::unordered_map<int32, std::vector<std::shared_ptr<TRuntimeProfileTree>>> fragment_id_to_profile, @@ -169,47 +169,6 @@ TReportExecStatusParams RuntimeQueryStatiticsMgr::create_report_exec_status_para return req; } -TReportExecStatusParams RuntimeQueryStatiticsMgr::create_report_exec_status_params_non_pipeline( - const TUniqueId& query_id, - const std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>& - instance_id_to_profile, - const std::vector<std::shared_ptr<TRuntimeProfileTree>>& load_channel_profile, - bool is_done) { - TQueryProfile profile; - std::vector<TUniqueId> fragment_instance_ids; - std::vector<TRuntimeProfileTree> instance_profiles; - std::vector<TRuntimeProfileTree> load_channel_profiles; - - for (auto entry : instance_id_to_profile) { - TUniqueId instance_id = entry.first; - std::shared_ptr<TRuntimeProfileTree> profile = entry.second; - - if (profile == nullptr) { - auto msg = fmt::format("Register instance profile {} {} failed, profile is null", - print_id(query_id), print_id(instance_id)); - DCHECK(false) << msg; - LOG_ERROR(msg); - continue; - } - - fragment_instance_ids.push_back(instance_id); - instance_profiles.push_back(*profile); - } - - profile.__set_query_id(query_id); - profile.__set_fragment_instance_ids(fragment_instance_ids); - profile.__set_instance_profiles(instance_profiles); - profile.__set_load_channel_profiles(load_channel_profiles); - - TReportExecStatusParams res; - res.__set_query_profile(profile); - // invalid query id to avoid API compatibility during upgrade - res.__set_query_id(TUniqueId()); - res.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id); - res.__set_done(is_done); - return res; -} - void RuntimeQueryStatiticsMgr::start_report_thread() { if (started.load()) { DCHECK(false) << "report thread has been started"; @@ -230,8 +189,7 @@ void RuntimeQueryStatiticsMgr::report_query_profiles_thread() { { std::unique_lock<std::mutex> lock(_report_profile_mutex); - while (_query_profile_map.empty() && _profile_map_x.empty() && - !_report_profile_thread_stop) { + while (_profile_map.empty() && !_report_profile_thread_stop) { _report_profile_cv.wait_for(lock, std::chrono::seconds(3)); } } @@ -273,37 +231,7 @@ void RuntimeQueryStatiticsMgr::stop_report_thread() { LOG_INFO("All report threads stopped"); } -void RuntimeQueryStatiticsMgr::register_instance_profile( - const TUniqueId& query_id, const TNetworkAddress& coor_addr, const TUniqueId& instance_id, - std::shared_ptr<TRuntimeProfileTree> instance_profile, - std::shared_ptr<TRuntimeProfileTree> load_channel_profile) { - if (instance_profile == nullptr) { - auto msg = fmt::format("Register instance profile {} {} failed, profile is null", - print_id(query_id), print_id(instance_id)); - DCHECK(false) << msg; - LOG_ERROR(msg); - return; - } - - std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock); - - if (!_query_profile_map.contains(query_id)) { - _query_profile_map[query_id] = std::make_tuple( - coor_addr, std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>()); - } - - std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>& instance_profile_map = - std::get<1>(_query_profile_map[query_id]); - instance_profile_map.insert(std::make_pair(instance_id, instance_profile)); - - if (load_channel_profile != nullptr) { - _load_channel_profile_map[instance_id] = load_channel_profile; - } - - LOG_INFO("Register instance profile {} {}", print_id(query_id), print_id(instance_id)); -} - -void RuntimeQueryStatiticsMgr::register_fragment_profile_x( +void RuntimeQueryStatiticsMgr::register_fragment_profile( const TUniqueId& query_id, const TNetworkAddress& coor_addr, int32_t fragment_id, std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles, std::shared_ptr<TRuntimeProfileTree> load_channel_profile_x) { @@ -319,95 +247,34 @@ void RuntimeQueryStatiticsMgr::register_fragment_profile_x( std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock); - if (!_profile_map_x.contains(query_id)) { - _profile_map_x[query_id] = std::make_tuple( + if (!_profile_map.contains(query_id)) { + _profile_map[query_id] = std::make_tuple( coor_addr, std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>()); } std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>& - fragment_profile_map = std::get<1>(_profile_map_x[query_id]); + fragment_profile_map = std::get<1>(_profile_map[query_id]); fragment_profile_map.insert(std::make_pair(fragment_id, p_profiles)); if (load_channel_profile_x != nullptr) { - _load_channel_profile_map_x[std::make_pair(query_id, fragment_id)] = load_channel_profile_x; + _load_channel_profile_map[std::make_pair(query_id, fragment_id)] = load_channel_profile_x; } LOG_INFO("register x profile done {}, fragment {}, profiles {}", print_id(query_id), fragment_id, p_profiles.size()); } -void RuntimeQueryStatiticsMgr::_report_query_profiles_non_pipeline() { - // query_id -> {coordinator_addr, {instance_id -> instance_profile}} - decltype(_query_profile_map) profile_copy; +void RuntimeQueryStatiticsMgr::_report_query_profiles_function() { + decltype(_profile_map) profile_copy; decltype(_load_channel_profile_map) load_channel_profile_copy; { std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock); - _query_profile_map.swap(profile_copy); + _profile_map.swap(profile_copy); _load_channel_profile_map.swap(load_channel_profile_copy); } - // query_id -> {coordinator_addr, {instance_id -> instance_profile}} - for (const auto& entry : profile_copy) { - const auto& query_id = entry.first; - const auto& coor_addr = std::get<0>(entry.second); - const std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>& - instance_id_to_profile = std::get<1>(entry.second); - - for (const auto& profile_entry : instance_id_to_profile) { - const auto& instance_id = profile_entry.first; - const auto& instance_profile = profile_entry.second; - - if (instance_profile == nullptr) { - auto msg = fmt::format("Query {} instance {} profile is null", print_id(query_id), - print_id(instance_id)); - DCHECK(false) << msg; - LOG_ERROR(msg); - continue; - } - } - - std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles; - for (const auto& load_channel_profile : load_channel_profile_copy) { - if (load_channel_profile.second == nullptr) { - auto msg = fmt::format( - "Register fragment profile {} {} failed, load channel profile is null", - print_id(query_id), -1); - DCHECK(false) << msg; - LOG_ERROR(msg); - continue; - } - - load_channel_profiles.push_back(load_channel_profile.second); - } - - TReportExecStatusParams req = create_report_exec_status_params_non_pipeline( - query_id, instance_id_to_profile, load_channel_profiles, /*is_done=*/true); - TReportExecStatusResult res; - auto rpc_status = _do_report_exec_stats_rpc(coor_addr, req, res); - - if (res.status.status_code != TStatusCode::OK) { - std::stringstream ss; - res.status.printTo(ss); - LOG_WARNING("Query {} send profile to {} failed, msg: {}", print_id(query_id), - PrintThriftNetworkAddress(coor_addr), ss.str()); - } else { - LOG_INFO("Send {} profile finished", print_id(query_id)); - } - } -} - -void RuntimeQueryStatiticsMgr::_report_query_profiles_x() { - decltype(_profile_map_x) profile_copy; - decltype(_load_channel_profile_map_x) load_channel_profile_copy; - - { - std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock); - _profile_map_x.swap(profile_copy); - _load_channel_profile_map_x.swap(load_channel_profile_copy); - } - // query_id -> {coordinator_addr, {fragment_id -> std::vectpr<pipeline_profile>}} for (auto& entry : profile_copy) { const auto& query_id = entry.first; @@ -435,7 +302,7 @@ void RuntimeQueryStatiticsMgr::_report_query_profiles_x() { load_channel_profiles.push_back(load_channel_profile.second); } - TReportExecStatusParams req = create_report_exec_status_params_x( + TReportExecStatusParams req = create_report_exec_status_params( query_id, std::move(fragment_profile_map), std::move(load_channel_profiles), /*is_done=*/true); TReportExecStatusResult res; diff --git a/be/src/runtime/runtime_query_statistics_mgr.h b/be/src/runtime/runtime_query_statistics_mgr.h index ff61f665342..5fb2332c335 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.h +++ b/be/src/runtime/runtime_query_statistics_mgr.h @@ -69,19 +69,12 @@ public: RuntimeQueryStatiticsMgr() = default; ~RuntimeQueryStatiticsMgr() = default; - static TReportExecStatusParams create_report_exec_status_params_x( + static TReportExecStatusParams create_report_exec_status_params( const TUniqueId& q_id, std::unordered_map<int32, std::vector<std::shared_ptr<TRuntimeProfileTree>>> fragment_id_to_profile, std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profile, bool is_done); - static TReportExecStatusParams create_report_exec_status_params_non_pipeline( - const TUniqueId& q_id, - const std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>& - instance_id_to_profile, - const std::vector<std::shared_ptr<TRuntimeProfileTree>>& load_channel_profile, - bool is_done); - void register_query_statistics(std::string query_id, std::shared_ptr<QueryStatistics> qs_ptr, TNetworkAddress fe_addr, TQueryType::type query_type); @@ -105,15 +98,10 @@ public: void trigger_report_profile(); void stop_report_thread(); - void register_instance_profile(const TUniqueId& query_id, const TNetworkAddress& coor_addr, - const TUniqueId& instance_id, - std::shared_ptr<TRuntimeProfileTree> instance_profile, - std::shared_ptr<TRuntimeProfileTree> load_channel_profile); - - void register_fragment_profile_x(const TUniqueId& query_id, const TNetworkAddress& const_addr, - int32_t fragment_id, - std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles, - std::shared_ptr<TRuntimeProfileTree> load_channel_profile_x); + void register_fragment_profile(const TUniqueId& query_id, const TNetworkAddress& const_addr, + int32_t fragment_id, + std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles, + std::shared_ptr<TRuntimeProfileTree> load_channel_profile_x); private: std::shared_mutex _qs_ctx_map_lock; @@ -125,13 +113,7 @@ private: std::condition_variable _report_profile_cv; bool _report_profile_thread_stop = false; - void _report_query_profiles_function() { - _report_query_profiles_x(); - _report_query_profiles_non_pipeline(); - } - - void _report_query_profiles_x(); - void _report_query_profiles_non_pipeline(); + void _report_query_profiles_function(); std::shared_mutex _query_profile_map_lock; @@ -140,17 +122,9 @@ private: TUniqueId, std::tuple<TNetworkAddress, std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>>> - _profile_map_x; + _profile_map; std::unordered_map<std::pair<TUniqueId, int32_t>, std::shared_ptr<TRuntimeProfileTree>> - _load_channel_profile_map_x; - - // query_id -> {coordinator_addr, {instance_id -> instance_profile}} - std::unordered_map< - TUniqueId, - std::tuple<TNetworkAddress, - std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>>> - _query_profile_map; - std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>> _load_channel_profile_map; + _load_channel_profile_map; }; } // namespace doris \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java index e8b450b530c..9c08ab343d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java @@ -78,7 +78,6 @@ public class ExecutionProfile { private RuntimeProfile loadChannelProfile; // FragmentId -> InstanceId -> RuntimeProfile private Map<PlanFragmentId, Map<TUniqueId, RuntimeProfile>> fragmentInstancesProfiles; - private boolean isPipelineXProfile = false; // use to merge profile from multi be private Map<Integer, Map<TNetworkAddress, List<RuntimeProfile>>> multiBeProfile = null; @@ -138,7 +137,7 @@ public class ExecutionProfile { return allPipelines; } - private RuntimeProfile getPipelineXAggregatedProfile(Map<Integer, String> planNodeMap) { + private RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String> planNodeMap) { RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments"); for (int i = 0; i < fragmentProfiles.size(); ++i) { RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " + i); @@ -158,80 +157,34 @@ public class ExecutionProfile { return fragmentsProfile; } - private RuntimeProfile getNonPipelineXAggregatedProfile(Map<Integer, String> planNodeMap) { - RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments"); - for (int i = 0; i < fragmentProfiles.size(); ++i) { - RuntimeProfile oldFragmentProfile = fragmentProfiles.get(seqNoToFragmentId.get(i)); - RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " + i); - fragmentsProfile.addChild(newFragmentProfile); - List<RuntimeProfile> allInstanceProfiles = new ArrayList<RuntimeProfile>(); - for (Pair<RuntimeProfile, Boolean> runtimeProfile : oldFragmentProfile.getChildList()) { - allInstanceProfiles.add(runtimeProfile.first); - } - RuntimeProfile mergedInstanceProfile = new RuntimeProfile("Instance" + "(instance_num=" - + allInstanceProfiles.size() + ")", allInstanceProfiles.get(0).nodeId()); - newFragmentProfile.addChild(mergedInstanceProfile); - RuntimeProfile.mergeProfiles(allInstanceProfiles, mergedInstanceProfile, planNodeMap); - } - return fragmentsProfile; - } - public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String> planNodeMap) { - if (isPipelineXProfile) { - /* - * Fragment 0 - * ---Pipeline 0 - * ------pipelineTask 0 - * ------pipelineTask 0 - * ------pipelineTask 0 - * ---Pipeline 1 - * ------pipelineTask 1 - * ---Pipeline 2 - * ------pipelineTask 2 - * ------pipelineTask 2 - * Fragment 1 - * ---Pipeline 0 - * ------...... - * ---Pipeline 1 - * ------...... - * ---Pipeline 2 - * ------...... - * ...... - */ - return getPipelineXAggregatedProfile(planNodeMap); - } else { - /* - * Fragment 0 - * ---Instance 0 - * ------pipelineTask 0 - * ------pipelineTask 1 - * ------pipelineTask 2 - * ---Instance 1 - * ------pipelineTask 0 - * ------pipelineTask 1 - * ------pipelineTask 2 - * ---Instance 2 - * ------pipelineTask 0 - * ------pipelineTask 1 - * ------pipelineTask 2 - * Fragment 1 - * ---Instance 0 - * ---Instance 1 - * ---Instance 2 - * ...... - */ - return getNonPipelineXAggregatedProfile(planNodeMap); - } + /* + * Fragment 0 + * ---Pipeline 0 + * ------pipelineTask 0 + * ------pipelineTask 0 + * ------pipelineTask 0 + * ---Pipeline 1 + * ------pipelineTask 1 + * ---Pipeline 2 + * ------pipelineTask 2 + * ------pipelineTask 2 + * Fragment 1 + * ---Pipeline 0 + * ------...... + * ---Pipeline 1 + * ------...... + * ---Pipeline 2 + * ------...... + * ...... + */ + return getPipelineAggregatedProfile(planNodeMap); } public RuntimeProfile getRoot() { return root; } - public void setPipelineX() { - this.isPipelineXProfile = true; - } - // The execution profile is maintained in ProfileManager, if it is finished, then should // remove it from it as soon as possible. public void update(long startTime, boolean isFinished) { @@ -255,76 +208,32 @@ public class ExecutionProfile { return new Status(TStatusCode.INVALID_ARGUMENT, "QueryId is not set"); } - if (isPipelineXProfile) { - if (!profile.isSetFragmentIdToProfile()) { - LOG.warn("{} FragmentIdToProfile is not set", DebugUtil.printId(profile.getQueryId())); - return new Status(TStatusCode.INVALID_ARGUMENT, "FragmentIdToProfile is not set"); - } - - for (Entry<Integer, List<TDetailedReportParams>> entry : profile.getFragmentIdToProfile().entrySet()) { - int fragmentId = entry.getKey(); - List<TDetailedReportParams> fragmentProfile = entry.getValue(); - int pipelineIdx = 0; - List<RuntimeProfile> taskProfile = Lists.newArrayList(); - for (TDetailedReportParams pipelineProfile : fragmentProfile) { - String name = "Pipeline :" + pipelineIdx + " " - + " (host=" + backendHBAddress + ")"; - RuntimeProfile profileNode = new RuntimeProfile(name); - taskProfile.add(profileNode); - if (!pipelineProfile.isSetProfile()) { - LOG.warn("Profile is not set, {}", DebugUtil.printId(profile.getQueryId())); - return new Status(TStatusCode.INVALID_ARGUMENT, "Profile is not set"); - } - - profileNode.update(pipelineProfile.profile); - profileNode.setIsDone(isDone); - pipelineIdx++; - fragmentProfiles.get(fragmentId).addChild(profileNode); - } - multiBeProfile.get(fragmentId).put(backendHBAddress, taskProfile); - } - } else { - if (!profile.isSetInstanceProfiles() || !profile.isSetFragmentInstanceIds()) { - LOG.warn("InstanceIdToProfile is not set, {}", DebugUtil.printId(profile.getQueryId())); - return new Status(TStatusCode.INVALID_ARGUMENT, "InstanceIdToProfile is not set"); - } - - if (profile.fragment_instance_ids.size() != profile.instance_profiles.size()) { - LOG.warn("InstanceIdToProfile size is not equal, {}", - DebugUtil.printId(profile.getQueryId())); - return new Status(TStatusCode.INVALID_ARGUMENT, "InstanceIdToProfile size is not equal"); - } + if (!profile.isSetFragmentIdToProfile()) { + LOG.warn("{} FragmentIdToProfile is not set", DebugUtil.printId(profile.getQueryId())); + return new Status(TStatusCode.INVALID_ARGUMENT, "FragmentIdToProfile is not set"); + } - for (int idx = 0; idx < profile.getFragmentInstanceIdsSize(); idx++) { - TUniqueId instanceId = profile.getFragmentInstanceIds().get(idx); - TRuntimeProfileTree instanceProfile = profile.getInstanceProfiles().get(idx); - if (instanceProfile == null) { - LOG.warn("Profile is not set {}", DebugUtil.printId(profile.getQueryId())); + for (Entry<Integer, List<TDetailedReportParams>> entry : profile.getFragmentIdToProfile().entrySet()) { + int fragmentId = entry.getKey(); + List<TDetailedReportParams> fragmentProfile = entry.getValue(); + int pipelineIdx = 0; + List<RuntimeProfile> taskProfile = Lists.newArrayList(); + for (TDetailedReportParams pipelineProfile : fragmentProfile) { + String name = "Pipeline :" + pipelineIdx + " " + + " (host=" + backendHBAddress + ")"; + RuntimeProfile profileNode = new RuntimeProfile(name); + taskProfile.add(profileNode); + if (!pipelineProfile.isSetProfile()) { + LOG.warn("Profile is not set, {}", DebugUtil.printId(profile.getQueryId())); return new Status(TStatusCode.INVALID_ARGUMENT, "Profile is not set"); } - PlanFragmentId fragmentId = instanceIdToFragmentId.get(instanceId); - if (fragmentId == null) { - LOG.warn("Could not find related fragment for instance {}", - DebugUtil.printId(instanceId)); - return new Status(TStatusCode.INVALID_ARGUMENT, "Could not find related fragment"); - } - - // Do not use fragment id in params, because non-pipeline engine will set it to -1 - Map<TUniqueId, RuntimeProfile> instanceProfiles = fragmentInstancesProfiles.get(fragmentId); - if (instanceProfiles == null) { - LOG.warn("Could not find related instances for fragment {}", fragmentId); - return new Status(TStatusCode.INVALID_ARGUMENT, "Could not find related instance"); - } - - RuntimeProfile curInstanceProfile = instanceProfiles.get(instanceId); - if (curInstanceProfile == null) { - LOG.warn("Could not find related profile {}", DebugUtil.printId(instanceId)); - return new Status(TStatusCode.INVALID_ARGUMENT, "Could not find related instance"); - } - curInstanceProfile.setIsDone(isDone); - curInstanceProfile.update(instanceProfile); + profileNode.update(pipelineProfile.profile); + profileNode.setIsDone(isDone); + pipelineIdx++; + fragmentProfiles.get(fragmentId).addChild(profileNode); } + multiBeProfile.get(fragmentId).put(backendHBAddress, taskProfile); } if (profile.isSetLoadChannelProfiles()) { @@ -348,59 +257,32 @@ public class ExecutionProfile { LOG.warn("backend id is not set in report profile request, bad message"); return; } - if (isPipelineXProfile) { - int pipelineIdx = 0; - List<RuntimeProfile> taskProfile = Lists.newArrayList(); - String suffix = " (host=" + backend.getHeartbeatAddress() + ")"; - for (TDetailedReportParams param : params.detailed_report) { - String name = param.isSetIsFragmentLevel() && param.is_fragment_level ? "Fragment Level Profile: " - + suffix : "Pipeline :" + pipelineIdx + " " + suffix; - RuntimeProfile profile = new RuntimeProfile(name); - taskProfile.add(profile); - if (param.isSetProfile()) { - profile.update(param.profile); - } - if (params.done) { - profile.setIsDone(true); - } - pipelineIdx++; - profile.sortChildren(); - fragmentProfiles.get(params.fragment_id).addChild(profile); - } - // TODO ygl: is this right? there maybe multi Backends, what does - // update load profile do??? - if (params.isSetLoadChannelProfile()) { - loadChannelProfile.update(params.loadChannelProfile); - } - multiBeProfile.get(params.fragment_id).put(backend.getHeartbeatAddress(), taskProfile); - } else { - PlanFragmentId fragmentId = instanceIdToFragmentId.get(params.fragment_instance_id); - if (fragmentId == null) { - LOG.warn("Could not find related fragment for instance {}", - DebugUtil.printId(params.fragment_instance_id)); - return; - } - // Do not use fragment id in params, because non-pipeline engine will set it to -1 - Map<TUniqueId, RuntimeProfile> instanceProfiles = fragmentInstancesProfiles.get(fragmentId); - if (instanceProfiles == null) { - LOG.warn("Could not find related instances for fragment {}", fragmentId); - return; - } - RuntimeProfile instanceProfile = instanceProfiles.get(params.fragment_instance_id); - if (instanceProfile == null) { - LOG.warn("Could not find related instance {}", params.fragment_instance_id); - return; - } - if (params.isSetProfile()) { - instanceProfile.update(params.profile); - } - if (params.isSetDone() && params.isDone()) { - instanceProfile.setIsDone(true); + + int pipelineIdx = 0; + List<RuntimeProfile> taskProfile = Lists.newArrayList(); + String suffix = " (host=" + backend.getHeartbeatAddress() + ")"; + for (TDetailedReportParams param : params.detailed_report) { + String name = param.isSetIsFragmentLevel() && param.is_fragment_level ? "Fragment Level Profile: " + + suffix : "Pipeline :" + pipelineIdx + " " + suffix; + RuntimeProfile profile = new RuntimeProfile(name); + taskProfile.add(profile); + if (param.isSetProfile()) { + profile.update(param.profile); } - if (params.isSetLoadChannelProfile()) { - loadChannelProfile.update(params.loadChannelProfile); + if (params.done) { + profile.setIsDone(true); } + pipelineIdx++; + profile.sortChildren(); + fragmentProfiles.get(params.fragment_id).addChild(profile); + } + // TODO ygl: is this right? there maybe multi Backends, what does + // update load profile do??? + if (params.isSetLoadChannelProfile()) { + loadChannelProfile.update(params.loadChannelProfile); } + + multiBeProfile.get(params.fragment_id).put(backend.getHeartbeatAddress(), taskProfile); } // MultiInstances may update the profile concurrently diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java index c0d3614550d..12b3b903880 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java @@ -79,9 +79,6 @@ public class Profile { LOG.warn("try to set a null excecution profile, it is abnormal", new Exception()); return; } - if (this.isPipelineX) { - executionProfile.setPipelineX(); - } executionProfile.setSummaryProfile(summaryProfile); this.executionProfiles.add(executionProfile); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 265d3afdb35..94e7d59625a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -387,10 +387,6 @@ public class Coordinator implements CoordInterface { // https://github.com/apache/doris/blob/bd6f5b6a0e5f1b12744607336123d7f97eb76af9/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java#L155 this.enablePipelineEngine = Config.enable_pipeline_load; this.enablePipelineXEngine = Config.enable_pipeline_load; - // make sure Coordinator can update profile correctlly - if (this.enablePipelineXEngine) { - this.executionProfile.setPipelineX(); - } } private void setFromUserProperty(ConnectContext connectContext) { @@ -3248,16 +3244,7 @@ public class Coordinator implements CoordInterface { this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime(); this.enablePipelineX = enablePipelineX; - if (this.enablePipelineX) { - executionProfile.addFragmentBackend(fragmentId, backendId); - } else { - for (TPipelineInstanceParams instanceParam : rpcParams.local_params) { - String profileName = "Instance " + DebugUtil.printId(instanceParam.fragment_instance_id) - + " (host=" + address + ")"; - executionProfile.addInstanceProfile(fragmentId, instanceParam.fragment_instance_id, - new RuntimeProfile(profileName)); - } - } + executionProfile.addFragmentBackend(fragmentId, backendId); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org