This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7d5a6abb9c2 [refactor](profile) refactor profile report on BE (#33331) 7d5a6abb9c2 is described below commit 7d5a6abb9c2439e29b89bad9b8a35ee1284adf41 Author: zhiqiang <seuhezhiqi...@163.com> AuthorDate: Wed Apr 17 13:54:03 2024 +0800 [refactor](profile) refactor profile report on BE (#33331) First task of #33744 During close stage, PipelineXFragmentContext will collect is profile, and register them to QueryContext De-constructor of QueryContext will add all of its profile to RuntimeQueryStatiticsMgr, and trigger a report thread to report profile Report thread of RuntimeQueryStatiticsMgr will collect all registered profile, and report profile by query. RuntimeQueryStatiticsMgr will create 5 threads to do profile report task, when BE stops gracefully, they will flush all profile, which is important in cloud mode. --- be/src/common/config.cpp | 2 + be/src/common/config.h | 1 + be/src/pipeline/pipeline_fragment_context.h | 2 +- .../pipeline_x/pipeline_x_fragment_context.cpp | 90 ++++- .../pipeline_x/pipeline_x_fragment_context.h | 3 + be/src/runtime/exec_env_init.cpp | 5 +- be/src/runtime/fragment_mgr.cpp | 59 +++ be/src/runtime/fragment_mgr.h | 5 + be/src/runtime/plan_fragment_executor.cpp | 33 ++ be/src/runtime/plan_fragment_executor.h | 4 + be/src/runtime/query_context.cpp | 181 ++++++++- be/src/runtime/query_context.h | 58 +++ be/src/runtime/runtime_query_statistics_mgr.cpp | 421 +++++++++++++++++++++ be/src/runtime/runtime_query_statistics_mgr.h | 70 ++++ be/src/service/backend_service.cpp | 26 ++ be/src/service/backend_service.h | 3 + .../doris/common/profile/ExecutionProfile.java | 79 ++++ .../java/org/apache/doris/qe/QeProcessorImpl.java | 33 ++ .../org/apache/doris/common/GenericPoolTest.java | 8 + .../apache/doris/utframe/MockedBackendFactory.java | 8 + gensrc/thrift/BackendService.thrift | 13 + gensrc/thrift/FrontendService.thrift | 22 +- 22 files changed, 1112 insertions(+), 14 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 13acfd49042..603b1739062 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1153,6 +1153,8 @@ DEFINE_mInt32(report_query_statistics_interval_ms, "3000"); // 30s DEFINE_mInt32(query_statistics_reserve_timeout_ms, "30000"); +DEFINE_mInt32(report_exec_status_thread_num, "5"); + // consider two high usage disk at the same available level if they do not exceed this diff. DEFINE_mDouble(high_disk_avail_level_diff_usages, "0.15"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 9f226e22be2..2aca6a31fbb 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1235,6 +1235,7 @@ DECLARE_Int32(ignore_invalid_partition_id_rowset_num); DECLARE_mInt32(report_query_statistics_interval_ms); DECLARE_mInt32(query_statistics_reserve_timeout_ms); +DECLARE_mInt32(report_exec_status_thread_num); // consider two high usage disk at the same available level if they do not exceed this diff. DECLARE_mDouble(high_disk_avail_level_diff_usages); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 009a2a2f22d..b9bfcb28f68 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -156,7 +156,7 @@ protected: ExecEnv* _exec_env = nullptr; - bool _prepared = false; + std::atomic_bool _prepared = false; bool _submitted = false; std::mutex _status_lock; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index f23e39472ab..3b03fbbf400 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -21,6 +21,7 @@ #include <gen_cpp/PaloInternalService_types.h> #include <gen_cpp/PlanNodes_types.h> #include <gen_cpp/Planner_types.h> +#include <gen_cpp/RuntimeProfile_types.h> #include <pthread.h> #include <runtime/result_buffer_mgr.h> @@ -1378,6 +1379,12 @@ void PipelineXFragmentContext::_close_fragment_instance() { LOG_INFO("Query {} fragment {} profile:\n {}", print_id(this->_query_id), this->_fragment_id, ss.str()); } + + if (_query_ctx->enable_profile()) { + _query_ctx->add_fragment_profile_x(_fragment_id, collect_realtime_profile_x(), + collect_realtime_load_channel_profile_x()); + } + // all submitted tasks done _exec_env->fragment_mgr()->remove_pipeline_context( std::dynamic_pointer_cast<PipelineXFragmentContext>(shared_from_this())); @@ -1409,15 +1416,82 @@ Status PipelineXFragmentContext::send_report(bool done) { for (auto& task_state : _task_runtime_states) { runtime_states.push_back(task_state.get()); } + + ReportStatusRequest req {true, + exec_status, + runtime_states, + nullptr, + _runtime_state->load_channel_profile(), + done || !exec_status.ok(), + _query_ctx->coord_addr, + _query_id, + _fragment_id, + TUniqueId(), + _backend_num, + _runtime_state.get(), + [this](Status st) { return update_status(st); }, + [this](const PPlanFragmentCancelReason& reason, + const std::string& msg) { cancel(reason, msg); }}; + return _report_status_cb( - {true, exec_status, runtime_states, nullptr, _runtime_state->load_channel_profile(), - done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, _fragment_id, - TUniqueId(), _backend_num, _runtime_state.get(), - [this](Status st) { return update_status(st); }, - [this](const PPlanFragmentCancelReason& reason, const std::string& msg) { - cancel(reason, msg); - }}, - std::dynamic_pointer_cast<PipelineXFragmentContext>(shared_from_this())); + req, std::dynamic_pointer_cast<PipelineXFragmentContext>(shared_from_this())); +} + +std::vector<std::shared_ptr<TRuntimeProfileTree>> +PipelineXFragmentContext::collect_realtime_profile_x() const { + std::vector<std::shared_ptr<TRuntimeProfileTree>> res; + DCHECK(_query_ctx->enable_pipeline_x_exec() == true) + << fmt::format("Query {} calling a pipeline X function, but its pipeline X is disabled", + print_id(this->_query_id)); + + // we do not have mutex to protect pipeline_id_to_profile + // so we need to make sure this funciton is invoked after fragment context + // has already been prepared. + if (!this->_prepared) { + std::string msg = + "Query " + print_id(this->_query_id) + " collecting profile, but its not prepared"; + DCHECK(false) << msg; + LOG_ERROR(msg); + return res; + } + + // pipeline_id_to_profile is initialized in prepare stage + for (auto& pipeline_profile : _runtime_state->pipeline_id_to_profile()) { + auto profile_ptr = std::make_shared<TRuntimeProfileTree>(); + pipeline_profile->to_thrift(profile_ptr.get()); + res.push_back(profile_ptr); + } + + return res; +} + +std::shared_ptr<TRuntimeProfileTree> +PipelineXFragmentContext::collect_realtime_load_channel_profile_x() const { + // we do not have mutex to protect pipeline_id_to_profile + // so we need to make sure this funciton is invoked after fragment context + // has already been prepared. + if (!this->_prepared) { + std::string msg = + "Query " + print_id(this->_query_id) + " collecting profile, but its not prepared"; + DCHECK(false) << msg; + LOG_ERROR(msg); + return nullptr; + } + + for (auto& runtime_state : _task_runtime_states) { + if (runtime_state->runtime_profile() == nullptr) { + continue; + } + + auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>(); + + runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get()); + this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile); + } + + auto load_channel_profile = std::make_shared<TRuntimeProfileTree>(); + this->_runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get()); + return load_channel_profile; } std::string PipelineXFragmentContext::debug_string() { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 34d00c07652..31febc0d8aa 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -112,6 +112,9 @@ public: [[nodiscard]] int max_sink_operator_id() const { return _sink_operator_id; } + std::vector<std::shared_ptr<TRuntimeProfileTree>> collect_realtime_profile_x() const; + std::shared_ptr<TRuntimeProfileTree> collect_realtime_load_channel_profile_x() const; + std::string debug_string() override; private: diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 76c877c3695..37cd51d1ca4 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -308,7 +308,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, _workload_sched_mgr->start(this); RETURN_IF_ERROR(_spill_stream_mgr->init()); - + _runtime_query_statistics_mgr->start_report_thread(); _s_ready = true; return Status::OK(); @@ -606,6 +606,9 @@ void ExecEnv::destroy() { _storage_engine.reset(); SAFE_STOP(_spill_stream_mgr); + if (_runtime_query_statistics_mgr) { + _runtime_query_statistics_mgr->stop_report_thread(); + } SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool); SAFE_SHUTDOWN(_s3_file_upload_thread_pool); SAFE_SHUTDOWN(_join_node_thread_pool); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 14f77a98ace..bc80dca583d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -29,6 +29,7 @@ #include <gen_cpp/PlanNodes_types.h> #include <gen_cpp/Planner_types.h> #include <gen_cpp/QueryPlanExtra_types.h> +#include <gen_cpp/RuntimeProfile_types.h> #include <gen_cpp/Types_types.h> #include <gen_cpp/internal_service.pb.h> #include <pthread.h> @@ -36,6 +37,7 @@ #include <thrift/Thrift.h> #include <thrift/protocol/TDebugProtocol.h> #include <thrift/transport/TTransportException.h> +#include <unistd.h> #include <atomic> @@ -47,6 +49,7 @@ #include <memory> #include <mutex> #include <sstream> +#include <unordered_map> #include <utility> #include "common/config.h" @@ -483,6 +486,7 @@ void FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex std::chrono::system_clock::now().time_since_epoch()) .count(); std::lock_guard<std::mutex> lock(_lock); + _fragment_instance_map.erase(fragment_executor->fragment_instance_id()); g_fragment_executing_count << -1; @@ -1612,4 +1616,59 @@ void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_i } } +Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id, + TReportExecStatusParams* exec_status) { + if (exec_status == nullptr) { + return Status::InvalidArgument("exes_status is nullptr"); + } + + std::shared_ptr<QueryContext> query_context = nullptr; + + { + std::lock_guard<std::mutex> lock(_lock); + query_context = _query_ctx_map[query_id]; + } + + if (query_context == nullptr) { + return Status::NotFound("Query {} not found", print_id(query_id)); + } + + if (query_context->enable_pipeline_x_exec()) { + *exec_status = query_context->get_realtime_exec_status_x(); + } else { + auto instance_ids = query_context->get_fragment_instance_ids(); + std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>> instance_profiles; + std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles; + + for (auto& instance_id : instance_ids) { + std::shared_ptr<PlanFragmentExecutor> instance_executor = nullptr; + + { + std::lock_guard<std::mutex> lock(_lock); + instance_executor = _fragment_instance_map[instance_id]; + } + + if (instance_executor == nullptr) { + return Status::NotFound("Fragment instance {} not found", print_id(instance_id)); + } + + if (auto instance_profile = instance_executor->collect_realtime_query_profile()) { + instance_profiles.insert(std::make_pair(instance_id, instance_profile)); + } else { + continue; + } + + if (auto load_channel_profile = + instance_executor->collect_realtime_load_channel_profile()) { + load_channel_profiles.push_back(load_channel_profile); + } + } + + *exec_status = RuntimeQueryStatiticsMgr::create_report_exec_status_params_non_pipeline( + query_id, instance_profiles, load_channel_profiles); + } + + return Status::OK(); +} + } // namespace doris diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index e748c88cece..25b555f4fe8 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -17,6 +17,7 @@ #pragma once +#include <gen_cpp/FrontendService_types.h> #include <gen_cpp/Types_types.h> #include <gen_cpp/types.pb.h> #include <stdint.h> @@ -33,6 +34,7 @@ #include "common/status.h" #include "gutil/ref_counted.h" #include "http/rest_monitor_iface.h" +#include "runtime/plan_fragment_executor.h" #include "runtime/query_context.h" #include "runtime_filter_mgr.h" #include "util/countdown_latch.h" @@ -151,6 +153,9 @@ public: void get_runtime_query_info(std::vector<WorkloadQueryInfo>* _query_info_list); + Status get_realtime_exec_status(const TUniqueId& query_id, + TReportExecStatusParams* exec_status); + private: void cancel_unlocked_impl(const TUniqueId& id, const PPlanFragmentCancelReason& reason, const std::unique_lock<std::mutex>& state_lock, bool is_pipeline, diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 7bf4f3846dd..5c0668ef55c 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -25,11 +25,13 @@ #include <gen_cpp/Metrics_types.h> #include <gen_cpp/PlanNodes_types.h> #include <gen_cpp/Planner_types.h> +#include <gen_cpp/RuntimeProfile_types.h> #include <pthread.h> #include <stdint.h> #include <stdlib.h> // IWYU pragma: no_include <bits/chrono.h> #include <chrono> // IWYU pragma: keep +#include <memory> #include <ostream> #include <typeinfo> #include <utility> @@ -638,10 +640,41 @@ void PlanFragmentExecutor::close() { load_channel_profile()->pretty_print(&ss); } LOG(INFO) << ss.str(); + + _query_ctx->add_instance_profile(_fragment_instance_id, + collect_realtime_query_profile(), + collect_realtime_load_channel_profile()); } } _closed = true; } +std::shared_ptr<TRuntimeProfileTree> PlanFragmentExecutor::collect_realtime_query_profile() const { + std::shared_ptr<TRuntimeProfileTree> res = std::make_shared<TRuntimeProfileTree>(); + + if (_runtime_state != nullptr) { + _runtime_state->runtime_profile()->compute_time_in_profile(); + _runtime_state->runtime_profile()->to_thrift(res.get()); + } else { + return nullptr; + } + + return res; +} + +std::shared_ptr<TRuntimeProfileTree> PlanFragmentExecutor::collect_realtime_load_channel_profile() + const { + std::shared_ptr<TRuntimeProfileTree> res = std::make_shared<TRuntimeProfileTree>(); + + if (_runtime_state != nullptr) { + _runtime_state->load_channel_profile()->compute_time_in_profile(); + _runtime_state->load_channel_profile()->to_thrift(res.get()); + } else { + return nullptr; + } + + return res; +} + } // namespace doris diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 89b2534b61b..e4d29af9ae9 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -21,6 +21,7 @@ #pragma once #include <gen_cpp/PaloInternalService_types.h> +#include <gen_cpp/RuntimeProfile_types.h> #include <gen_cpp/Types_types.h> #include <gen_cpp/types.pb.h> @@ -146,6 +147,9 @@ public: Status update_status(Status status); + std::shared_ptr<TRuntimeProfileTree> collect_realtime_query_profile() const; + std::shared_ptr<TRuntimeProfileTree> collect_realtime_load_channel_profile() const; + private: ExecEnv* _exec_env = nullptr; // not owned ExecNode* _plan = nullptr; // lives in _runtime_state->obj_pool() diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 10f54255741..0a76a13cac9 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -17,15 +17,31 @@ #include "runtime/query_context.h" +#include <fmt/core.h> +#include <gen_cpp/FrontendService_types.h> +#include <gen_cpp/RuntimeProfile_types.h> +#include <gen_cpp/Types_types.h> +#include <glog/logging.h> + #include <exception> #include <memory> +#include <mutex> +#include <sstream> +#include <utility> +#include "common/logging.h" +#include "olap/olap_common.h" #include "pipeline/pipeline_fragment_context.h" #include "pipeline/pipeline_x/dependency.h" +#include "pipeline/pipeline_x/pipeline_x_fragment_context.h" +#include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" #include "runtime/runtime_query_statistics_mgr.h" +#include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "runtime/workload_group/workload_group_manager.h" #include "util/mem_info.h" +#include "util/uid_util.h" namespace doris { @@ -117,7 +133,11 @@ QueryContext::~QueryContext() { } _exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id)); - LOG_INFO("Query {} deconstructed, {}", print_id(_query_id), mem_tracker_msg); + + if (enable_profile()) { + _report_query_profile(); + } + // Not release the the thread token in query context's dector method, because the query // conext may be dectored in the thread token it self. It is very dangerous and may core. // And also thread token need shutdown, it may take some time, may cause the thread that @@ -146,6 +166,8 @@ QueryContext::~QueryContext() { _runtime_predicates.clear(); file_scan_range_params_map.clear(); obj_pool.clear(); + + LOG_INFO("Query {} deconstructed, {}", print_id(this->_query_id), mem_tracker_msg); } void QueryContext::set_ready_to_execute(bool is_cancelled) { @@ -303,4 +325,161 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& tg) { return Status::OK(); } +void QueryContext::add_fragment_profile_x( + int fragment_id, const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles, + std::shared_ptr<TRuntimeProfileTree> load_channel_profile) { + if (pipeline_profiles.empty()) { + std::string msg = fmt::format("Add pipeline profile failed, query {}, fragment {}", + print_id(this->_query_id), fragment_id); + LOG_ERROR(msg); + DCHECK(false) << msg; + return; + } + +#ifndef NDEBUG + for (const auto& p : pipeline_profiles) { + DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed, query {}, fragment {}", + print_id(this->_query_id), fragment_id); + } +#endif + + std::lock_guard<std::mutex> l(_profile_mutex); + LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline profile count {} ", + print_id(this->_query_id), fragment_id, pipeline_profiles.size()); + + _profile_map_x.insert(std::make_pair(fragment_id, pipeline_profiles)); + + if (load_channel_profile != nullptr) { + _load_channel_profile_map_x.insert(std::make_pair(fragment_id, load_channel_profile)); + } +} + +void QueryContext::add_instance_profile(const TUniqueId& instance_id, + std::shared_ptr<TRuntimeProfileTree> profile, + std::shared_ptr<TRuntimeProfileTree> load_channel_profile) { + DCHECK(profile != nullptr) << print_id(instance_id); + + std::lock_guard<std::mutex> lg(_profile_mutex); + _profile_map.insert(std::make_pair(instance_id, profile)); + if (load_channel_profile != nullptr) { + _load_channel_profile_map.insert(std::make_pair(instance_id, load_channel_profile)); + } +} + +void QueryContext::_report_query_profile() { + _report_query_profile_x(); + _report_query_profile_non_pipeline(); +} + +void QueryContext::_report_query_profile_non_pipeline() { + if (enable_pipeline_exec() || enable_pipeline_x_exec()) { + return; + } + + std::lock_guard<std::mutex> lg(_profile_mutex); + LOG_INFO("Query {}, register query profile, instance profile count {}", print_id(_query_id), + _profile_map.size()); + + for (auto& [instance_id, instance_profile] : _profile_map) { + std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr; + if (_load_channel_profile_map.contains(instance_id)) { + load_channel_profile = _load_channel_profile_map[instance_id]; + } + + ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_instance_profile( + _query_id, this->coord_addr, instance_id, instance_profile, load_channel_profile); + } + + ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile(); +} + +void QueryContext::_report_query_profile_x() { + if (!enable_pipeline_x_exec()) { + return; + } + + std::lock_guard<std::mutex> lg(_profile_mutex); + LOG_INFO( + "Pipeline x query context, register query profile, query {}, fragment profile count {}", + print_id(_query_id), _profile_map_x.size()); + + for (auto& [fragment_id, fragment_profile] : _profile_map_x) { + std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr; + + if (_load_channel_profile_map_x.contains(fragment_id)) { + load_channel_profile = _load_channel_profile_map_x[fragment_id]; + } + + ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile_x( + _query_id, this->coord_addr, fragment_id, fragment_profile, load_channel_profile); + } + + ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile(); +} + +std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> +QueryContext::_collect_realtime_query_profile_x() const { + std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> res; + + if (!enable_pipeline_x_exec()) { + return res; + } + + for (auto& [fragment_id, fragment_ctx_wptr] : _fragment_id_to_pipeline_ctx) { + if (auto fragment_ctx = fragment_ctx_wptr.lock()) { + // In theory, cast result can not be nullptr since we have checked the pipeline X engine above + std::shared_ptr<pipeline::PipelineXFragmentContext> fragment_ctx_x = + std::dynamic_pointer_cast<pipeline::PipelineXFragmentContext>(fragment_ctx); + + if (fragment_ctx_x == nullptr) { + std::string msg = + fmt::format("PipelineXFragmentContext is nullptr, query {} fragment_id: {}", + print_id(_query_id), fragment_id); + LOG_ERROR(msg); + DCHECK(false) << msg; + continue; + } + + auto profile = fragment_ctx_x->collect_realtime_profile_x(); + + if (profile.empty()) { + std::string err_msg = fmt::format( + "Get nothing when collecting profile, query {}, fragment_id: {}", + print_id(_query_id), fragment_id); + LOG_ERROR(err_msg); + DCHECK(false) << err_msg; + continue; + } + + res.insert(std::make_pair(fragment_id, profile)); + } + } + + return res; +} + +TReportExecStatusParams QueryContext::get_realtime_exec_status_x() const { + TReportExecStatusParams exec_status; + + if (enable_pipeline_x_exec()) { + auto realtime_query_profile = _collect_realtime_query_profile_x(); + std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles; + + for (auto load_channel_profile : _load_channel_profile_map_x) { + if (load_channel_profile.second != nullptr) { + load_channel_profiles.push_back(load_channel_profile.second); + } + } + + exec_status = RuntimeQueryStatiticsMgr::create_report_exec_status_params_x( + this->_query_id, realtime_query_profile, load_channel_profiles); + } else { + auto msg = fmt::format("Query {} is not pipelineX query", print_id(_query_id)); + LOG_ERROR(msg); + DCHECK(false) << msg; + } + + return exec_status; +} + } // namespace doris diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index c78886997d0..b9a7e780375 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -18,11 +18,14 @@ #pragma once #include <gen_cpp/PaloInternalService_types.h> +#include <gen_cpp/RuntimeProfile_types.h> #include <gen_cpp/Types_types.h> #include <atomic> #include <memory> +#include <mutex> #include <string> +#include <unordered_map> #include "common/config.h" #include "common/factory_creator.h" @@ -32,6 +35,7 @@ #include "runtime/query_statistics.h" #include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_predicate.h" +#include "util/hash_util.hpp" #include "util/threadpool.h" #include "vec/exec/scan/scanner_scheduler.h" #include "vec/runtime/shared_hash_table_controller.h" @@ -238,6 +242,8 @@ public: ThreadPool* get_non_pipe_exec_thread_pool(); + std::vector<TUniqueId> get_fragment_instance_ids() const { return fragment_instance_ids; } + int64_t mem_limit() const { return _bytes_limit; } void set_merge_controller_handler( @@ -351,6 +357,58 @@ private: std::mutex _weighted_mem_lock; int64_t _weighted_consumption = 0; int64_t _weighted_limit = 0; + + std::mutex _profile_mutex; + + // when fragment of pipeline x is closed, it will register its profile to this map by using add_fragment_profile_x + // flatten profile of one fragment: + // Pipeline 0 + // PipelineTask 0 + // Operator 1 + // Operator 2 + // Scanner + // PipelineTask 1 + // Operator 1 + // Operator 2 + // Scanner + // Pipeline 1 + // PipelineTask 2 + // Operator 3 + // PipelineTask 3 + // Operator 3 + // fragment_id -> list<profile> + std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> _profile_map_x; + std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>> _load_channel_profile_map_x; + + // instance_id -> profile + std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>> _profile_map; + std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>> _load_channel_profile_map; + + void _report_query_profile(); + void _report_query_profile_non_pipeline(); + void _report_query_profile_x(); + + std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> + _collect_realtime_query_profile_x() const; + + std::unordered_map<TUniqueId, std::vector<std::shared_ptr<TRuntimeProfileTree>>> + _collect_realtime_query_profile_non_pipeline() const; + +public: + // when fragment of pipeline x is closed, it will register its profile to this map by using add_fragment_profile_x + void add_fragment_profile_x( + int fragment_id, + const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profile, + std::shared_ptr<TRuntimeProfileTree> load_channel_profile); + + void add_instance_profile(const TUniqueId& iid, std::shared_ptr<TRuntimeProfileTree> profile, + std::shared_ptr<TRuntimeProfileTree> load_channel_profile); + + TReportExecStatusParams get_realtime_exec_status_x() const; + + bool enable_profile() const { + return _query_options.__isset.enable_profile && _query_options.enable_profile; + } }; } // namespace doris diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 9764b0f0507..a59938b7d51 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -17,13 +17,434 @@ #include "runtime/runtime_query_statistics_mgr.h" +#include <gen_cpp/FrontendService_types.h> +#include <gen_cpp/RuntimeProfile_types.h> +#include <gen_cpp/Status_types.h> +#include <gen_cpp/Types_types.h> +#include <thrift/TApplicationException.h> + +#include <condition_variable> +#include <cstdint> +#include <memory> +#include <mutex> +#include <random> +#include <shared_mutex> +#include <string> +#include <tuple> +#include <unordered_map> +#include <vector> + +#include "common/logging.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" +#include "runtime/query_context.h" +#include "service/backend_options.h" #include "util/debug_util.h" +#include "util/hash_util.hpp" #include "util/time.h" +#include "util/uid_util.h" #include "vec/core/block.h" namespace doris { +// TODO: Currently this function is only used to report profile. +// In the future, all exec status and query statistics should be reported +// thorough this function. +static Status _do_report_exec_stats_rpc(const TNetworkAddress& coor_addr, + const TReportExecStatusParams& req, + TReportExecStatusResult& res) { + Status client_status; + FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr, + &client_status); + if (!client_status.ok()) { + LOG_WARNING( + "Could not get client rpc client of {} when reporting profiles, reason is {}, " + "not reporting, profile will be lost", + PrintThriftNetworkAddress(coor_addr), client_status.to_string()); + return Status::RpcError("Client rpc client failed"); + } + + try { + try { + rpc_client->reportExecStatus(res, req); + } catch (const apache::thrift::transport::TTransportException& e) { + LOG_WARNING("Transport exception from {}, reason: {}, reopening", + PrintThriftNetworkAddress(coor_addr), e.what()); + client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms); + if (!client_status.ok()) { + LOG_WARNING("Reopen failed, reason: {}", client_status.to_string()); + return Status::RpcError("Open rpc client failed"); + } + + rpc_client->reportExecStatus(res, req); + } + } catch (apache::thrift::TApplicationException& e) { + if (e.getType() == e.UNKNOWN_METHOD) { + LOG_WARNING( + "Failed to report query profile to {} due to {}, usually because the frontend " + "is not upgraded, check the version", + PrintThriftNetworkAddress(coor_addr), e.what()); + } else { + LOG_WARNING( + "Failed to report query profile to {}, reason: {}, you can see fe log for " + "details.", + PrintThriftNetworkAddress(coor_addr), e.what()); + } + return Status::RpcError("Send stats failed"); + } catch (std::exception& e) { + LOG_WARNING( + "Failed to report query profile to {}, reason: {}, you can see fe log for details.", + PrintThriftNetworkAddress(coor_addr), e.what()); + return Status::RpcError("Send report query profile failed"); + } + + return Status::OK(); +} + +TReportExecStatusParams RuntimeQueryStatiticsMgr::create_report_exec_status_params_x( + const TUniqueId& query_id, + const std::unordered_map<int32, std::vector<std::shared_ptr<TRuntimeProfileTree>>>& + fragment_id_to_profile, + const std::vector<std::shared_ptr<TRuntimeProfileTree>>& load_channel_profiles) { + TQueryProfile profile; + profile.__set_query_id(query_id); + + std::map<int32_t, std::vector<TDetailedReportParams>> fragment_id_to_profile_req; + + for (const auto& entry : fragment_id_to_profile) { + int32_t fragment_id = entry.first; + const std::vector<std::shared_ptr<TRuntimeProfileTree>>& fragment_profile = entry.second; + std::vector<TDetailedReportParams> detailed_params; + + for (auto pipeline_profile : fragment_profile) { + if (pipeline_profile == nullptr) { + auto msg = fmt::format("Register fragment profile {} {} failed, profile is null", + print_id(query_id), fragment_id); + DCHECK(false) << msg; + LOG_ERROR(msg); + continue; + } + + TDetailedReportParams tmp; + tmp.__set_profile(*pipeline_profile); + // tmp.fragment_instance_id is not needed for pipeline x + detailed_params.push_back(tmp); + } + + fragment_id_to_profile_req.insert(std::make_pair(fragment_id, detailed_params)); + } + + if (fragment_id_to_profile_req.size() == 0) { + LOG_WARNING("No fragment profile found for query {}", print_id(query_id)); + } + + profile.__set_fragment_id_to_profile(fragment_id_to_profile_req); + + std::vector<TRuntimeProfileTree> load_channel_profiles_req; + for (auto load_channel_profile : load_channel_profiles) { + if (load_channel_profile == nullptr) { + auto msg = fmt::format( + "Register fragment profile {} {} failed, load channel profile is null", + print_id(query_id), -1); + DCHECK(false) << msg; + LOG_ERROR(msg); + continue; + } + + load_channel_profiles_req.push_back(*load_channel_profile); + } + + if (load_channel_profiles_req.size() > 0) { + profile.__set_load_channel_profiles(load_channel_profiles_req); + } + + TReportExecStatusParams req; + req.__set_query_profile(profile); + req.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id); + // invalid query id to avoid API compatibility during upgrade + req.__set_query_id(TUniqueId()); + + return req; +} + +TReportExecStatusParams RuntimeQueryStatiticsMgr::create_report_exec_status_params_non_pipeline( + const TUniqueId& query_id, + const std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>& + instance_id_to_profile, + const std::vector<std::shared_ptr<TRuntimeProfileTree>>& load_channel_profile) { + TQueryProfile profile; + std::vector<TUniqueId> fragment_instance_ids; + std::vector<TRuntimeProfileTree> instance_profiles; + std::vector<TRuntimeProfileTree> load_channel_profiles; + + for (auto entry : instance_id_to_profile) { + TUniqueId instance_id = entry.first; + std::shared_ptr<TRuntimeProfileTree> profile = entry.second; + + if (profile == nullptr) { + auto msg = fmt::format("Register instance profile {} {} failed, profile is null", + print_id(query_id), print_id(instance_id)); + DCHECK(false) << msg; + LOG_ERROR(msg); + continue; + } + + fragment_instance_ids.push_back(instance_id); + instance_profiles.push_back(*profile); + } + + profile.__set_query_id(query_id); + profile.__set_fragment_instance_ids(fragment_instance_ids); + profile.__set_instance_profiles(instance_profiles); + profile.__set_load_channel_profiles(load_channel_profiles); + + TReportExecStatusParams res; + res.__set_query_profile(profile); + // invalid query id to avoid API compatibility during upgrade + res.__set_query_id(TUniqueId()); + res.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id); + return res; +} + +void RuntimeQueryStatiticsMgr::start_report_thread() { + if (started.load()) { + DCHECK(false) << "report thread has been started"; + LOG_ERROR("report thread has been started"); + return; + } + + started.store(true); + + for (size_t i = 0; i < config::report_exec_status_thread_num; ++i) { + this->_report_profile_threads.emplace_back(std::make_unique<std::thread>( + &RuntimeQueryStatiticsMgr::report_query_profiles_thread, this)); + } +} + +void RuntimeQueryStatiticsMgr::report_query_profiles_thread() { + while (true) { + { + std::unique_lock<std::mutex> lock(_report_profile_mutex); + + while (_query_profile_map.empty() && _profile_map_x.empty() && + !_report_profile_thread_stop) { + _report_profile_cv.wait_for(lock, std::chrono::seconds(3)); + } + } + + _report_query_profiles_function(); + + { + std::lock_guard<std::mutex> lg(_report_profile_mutex); + + if (_report_profile_thread_stop) { + LOG_INFO("Report profile thread stopped"); + return; + } + } + } +} + +void RuntimeQueryStatiticsMgr::trigger_report_profile() { + std::unique_lock<std::mutex> lock(_report_profile_mutex); + _report_profile_cv.notify_one(); +} + +void RuntimeQueryStatiticsMgr::stop_report_thread() { + if (!started) { + return; + } + + { + std::unique_lock<std::mutex> lock(_report_profile_mutex); + _report_profile_thread_stop = true; + LOG_INFO("All report threads are going to stop"); + _report_profile_cv.notify_all(); + } + + for (const auto& thread : _report_profile_threads) { + thread->join(); + } + + LOG_INFO("All report threads stopped"); +} + +void RuntimeQueryStatiticsMgr::register_instance_profile( + const TUniqueId& query_id, const TNetworkAddress& coor_addr, const TUniqueId& instance_id, + std::shared_ptr<TRuntimeProfileTree> instance_profile, + std::shared_ptr<TRuntimeProfileTree> load_channel_profile) { + if (instance_profile == nullptr) { + auto msg = fmt::format("Register instance profile {} {} failed, profile is null", + print_id(query_id), print_id(instance_id)); + DCHECK(false) << msg; + LOG_ERROR(msg); + return; + } + + std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock); + + if (!_query_profile_map.contains(query_id)) { + _query_profile_map[query_id] = std::make_tuple( + coor_addr, std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>()); + } + + std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>& instance_profile_map = + std::get<1>(_query_profile_map[query_id]); + instance_profile_map.insert(std::make_pair(instance_id, instance_profile)); + + if (load_channel_profile != nullptr) { + _load_channel_profile_map[instance_id] = load_channel_profile; + } + + LOG_INFO("Register instance profile {} {}", print_id(query_id), print_id(instance_id)); +} + +void RuntimeQueryStatiticsMgr::register_fragment_profile_x( + const TUniqueId& query_id, const TNetworkAddress& coor_addr, int32_t fragment_id, + std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles, + std::shared_ptr<TRuntimeProfileTree> load_channel_profile_x) { + for (const auto& p : p_profiles) { + if (p == nullptr) { + auto msg = fmt::format("Register fragment profile {} {} failed, profile is null", + print_id(query_id), fragment_id); + DCHECK(false) << msg; + LOG_ERROR(msg); + return; + } + } + + std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock); + + if (!_profile_map_x.contains(query_id)) { + _profile_map_x[query_id] = std::make_tuple( + coor_addr, + std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>()); + } + + std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>& + fragment_profile_map = std::get<1>(_profile_map_x[query_id]); + fragment_profile_map.insert(std::make_pair(fragment_id, p_profiles)); + + if (load_channel_profile_x != nullptr) { + _load_channel_profile_map_x[std::make_pair(query_id, fragment_id)] = load_channel_profile_x; + } + + LOG_INFO("register x profile done {}, fragment {}, profiles {}", print_id(query_id), + fragment_id, p_profiles.size()); +} + +void RuntimeQueryStatiticsMgr::_report_query_profiles_non_pipeline() { + // query_id -> {coordinator_addr, {instance_id -> instance_profile}} + decltype(_query_profile_map) profile_copy; + decltype(_load_channel_profile_map) load_channel_profile_copy; + + { + std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock); + _query_profile_map.swap(profile_copy); + _load_channel_profile_map.swap(load_channel_profile_copy); + } + + // query_id -> {coordinator_addr, {instance_id -> instance_profile}} + for (const auto& entry : profile_copy) { + const auto& query_id = entry.first; + const auto& coor_addr = std::get<0>(entry.second); + const std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>& + instance_id_to_profile = std::get<1>(entry.second); + + for (const auto& profile_entry : instance_id_to_profile) { + const auto& instance_id = profile_entry.first; + const auto& instance_profile = profile_entry.second; + + if (instance_profile == nullptr) { + auto msg = fmt::format("Query {} instance {} profile is null", print_id(query_id), + print_id(instance_id)); + DCHECK(false) << msg; + LOG_ERROR(msg); + continue; + } + } + + std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles; + for (const auto& load_channel_profile : load_channel_profile_copy) { + if (load_channel_profile.second == nullptr) { + auto msg = fmt::format( + "Register fragment profile {} {} failed, load channel profile is null", + print_id(query_id), -1); + DCHECK(false) << msg; + LOG_ERROR(msg); + continue; + } + + load_channel_profiles.push_back(load_channel_profile.second); + } + + TReportExecStatusParams req = create_report_exec_status_params_non_pipeline( + query_id, instance_id_to_profile, load_channel_profiles); + TReportExecStatusResult res; + auto rpc_status = _do_report_exec_stats_rpc(coor_addr, req, res); + + if (res.status.status_code != TStatusCode::OK) { + std::stringstream ss; + res.status.printTo(ss); + LOG_WARNING("Query {} send profile to {} failed, msg: {}", print_id(query_id), + PrintThriftNetworkAddress(coor_addr), ss.str()); + } else { + LOG_INFO("Send {} profile finished", print_id(query_id)); + } + } +} + +void RuntimeQueryStatiticsMgr::_report_query_profiles_x() { + decltype(_profile_map_x) profile_copy; + decltype(_load_channel_profile_map_x) load_channel_profile_copy; + + { + std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock); + _profile_map_x.swap(profile_copy); + _load_channel_profile_map_x.swap(load_channel_profile_copy); + } + + // query_id -> {coordinator_addr, {fragment_id -> std::vectpr<pipeline_profile>}} + for (const auto& entry : profile_copy) { + const auto& query_id = entry.first; + const auto& coor_addr = std::get<0>(entry.second); + const auto& fragment_profile_map = std::get<1>(entry.second); + + if (fragment_profile_map.empty()) { + auto msg = fmt::format("Query {} does not have profile", print_id(query_id)); + DCHECK(false) << msg; + LOG_ERROR(msg); + continue; + } + + std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles; + for (auto load_channel_profile : load_channel_profile_copy) { + if (load_channel_profile.second == nullptr) { + auto msg = fmt::format( + "Register fragment profile {} {} failed, load channel profile is null", + print_id(query_id), -1); + DCHECK(false) << msg; + LOG_ERROR(msg); + continue; + } + + load_channel_profiles.push_back(load_channel_profile.second); + } + + TReportExecStatusParams req = create_report_exec_status_params_x( + query_id, fragment_profile_map, load_channel_profiles); + TReportExecStatusResult res; + + auto rpc_status = _do_report_exec_stats_rpc(coor_addr, req, res); + + if (res.status.status_code != TStatusCode::OK || + res.status.status_code != TStatusCode::OK) { + LOG_WARNING("Query {} send profile to {} failed", print_id(query_id), + PrintThriftNetworkAddress(coor_addr)); + } else { + LOG_INFO("Send {} profile succeed", print_id(query_id)); + } + } +} void QueryStatisticsCtx::collect_query_statistics(TQueryStatistics* tq_s) { QueryStatistics tmp_qs; diff --git a/be/src/runtime/runtime_query_statistics_mgr.h b/be/src/runtime/runtime_query_statistics_mgr.h index 1b3e164d48f..088dd39be55 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.h +++ b/be/src/runtime/runtime_query_statistics_mgr.h @@ -18,12 +18,22 @@ #pragma once #include <gen_cpp/Data_types.h> +#include <gen_cpp/RuntimeProfile_types.h> +#include <gen_cpp/Types_types.h> +#include <condition_variable> +#include <cstdint> +#include <memory> +#include <mutex> #include <shared_mutex> #include <string> +#include <thread> +#include <unordered_map> +#include "gutil/integral_types.h" #include "runtime/query_statistics.h" #include "runtime/workload_management/workload_condition.h" +#include "util/hash_util.hpp" #include "util/time.h" namespace doris { @@ -57,6 +67,18 @@ public: RuntimeQueryStatiticsMgr() = default; ~RuntimeQueryStatiticsMgr() = default; + static TReportExecStatusParams create_report_exec_status_params_x( + const TUniqueId& q_id, + const std::unordered_map<int32, std::vector<std::shared_ptr<TRuntimeProfileTree>>>& + fragment_id_to_profile, + const std::vector<std::shared_ptr<TRuntimeProfileTree>>& load_channel_profile); + + static TReportExecStatusParams create_report_exec_status_params_non_pipeline( + const TUniqueId& q_id, + const std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>& + instance_id_to_profile, + const std::vector<std::shared_ptr<TRuntimeProfileTree>>& load_channel_profile); + void register_query_statistics(std::string query_id, std::shared_ptr<QueryStatistics> qs_ptr, TNetworkAddress fe_addr); @@ -75,9 +97,57 @@ public: // used for backend_active_tasks void get_active_be_tasks_block(vectorized::Block* block); + void start_report_thread(); + void report_query_profiles_thread(); + void trigger_report_profile(); + void stop_report_thread(); + + void register_instance_profile(const TUniqueId& query_id, const TNetworkAddress& coor_addr, + const TUniqueId& instance_id, + std::shared_ptr<TRuntimeProfileTree> instance_profile, + std::shared_ptr<TRuntimeProfileTree> load_channel_profile); + + void register_fragment_profile_x(const TUniqueId& query_id, const TNetworkAddress& const_addr, + int32_t fragment_id, + std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles, + std::shared_ptr<TRuntimeProfileTree> load_channel_profile_x); + private: std::shared_mutex _qs_ctx_map_lock; std::map<std::string, std::unique_ptr<QueryStatisticsCtx>> _query_statistics_ctx_map; + + std::mutex _report_profile_mutex; + std::atomic_bool started = false; + std::vector<std::unique_ptr<std::thread>> _report_profile_threads; + std::condition_variable _report_profile_cv; + bool _report_profile_thread_stop = false; + + void _report_query_profiles_function() { + _report_query_profiles_x(); + _report_query_profiles_non_pipeline(); + } + + void _report_query_profiles_x(); + void _report_query_profiles_non_pipeline(); + + std::shared_mutex _query_profile_map_lock; + + // query_id -> {coordinator_addr, {fragment_id -> std::vectpr<pipeline_profile>}} + std::unordered_map< + TUniqueId, + std::tuple<TNetworkAddress, + std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>>> + _profile_map_x; + std::unordered_map<std::pair<TUniqueId, int32_t>, std::shared_ptr<TRuntimeProfileTree>> + _load_channel_profile_map_x; + + // query_id -> {coordinator_addr, {instance_id -> instance_profile}} + std::unordered_map< + TUniqueId, + std::tuple<TNetworkAddress, + std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>>>> + _query_profile_map; + std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>> _load_channel_profile_map; }; } // namespace doris \ No newline at end of file diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index b1a110144ef..8aab496ed2f 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -23,6 +23,7 @@ #include <gen_cpp/BackendService_types.h> #include <gen_cpp/Data_types.h> #include <gen_cpp/DorisExternalService_types.h> +#include <gen_cpp/FrontendService_types.h> #include <gen_cpp/PaloInternalService_types.h> #include <gen_cpp/Planner_types.h> #include <gen_cpp/Status_types.h> @@ -1169,4 +1170,29 @@ void BaseBackendService::warm_up_tablets(TWarmUpTabletsResponse& response, response.__set_status(Status::NotSupported("warm_up_tablets is not implemented").to_thrift()); } +void BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse& response, + const TGetRealtimeExecStatusRequest& request) { + if (!request.__isset.id) { + LOG_WARNING("Invalidate argument, id is empty"); + response.__set_status(Status::InvalidArgument("id is empty").to_thrift()); + } + + LOG_INFO("Getting realtime exec status of query {}", print_id(request.id)); + std::unique_ptr<TReportExecStatusParams> report_exec_status_params = + std::make_unique<TReportExecStatusParams>(); + Status st = ExecEnv::GetInstance()->fragment_mgr()->get_realtime_exec_status( + request.id, report_exec_status_params.get()); + + if (!st.ok()) { + response.__set_status(st.to_thrift()); + return; + } + + report_exec_status_params->__set_query_id(TUniqueId()); + + response.__set_status(Status::OK().to_thrift()); + response.__set_report_exec_status_params(*report_exec_status_params); + return; +} + } // namespace doris diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index 20aaa96685a..b670b21221e 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -137,6 +137,9 @@ public: void query_ingest_binlog(TQueryIngestBinlogResult& result, const TQueryIngestBinlogRequest& request) override; + void get_realtime_exec_status(TGetRealtimeExecStatusResponse& response, + const TGetRealtimeExecStatusRequest& request) override; + //////////////////////////////////////////////////////////////////////////// // begin cloud backend functions //////////////////////////////////////////////////////////////////////////// diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java index d1db6ff43e2..bdd51a1a8cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java @@ -19,6 +19,7 @@ package org.apache.doris.common.profile; import org.apache.doris.catalog.Env; import org.apache.doris.common.Pair; +import org.apache.doris.common.Status; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.TimeUtils; @@ -27,7 +28,10 @@ import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TDetailedReportParams; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TQueryProfile; import org.apache.doris.thrift.TReportExecStatusParams; +import org.apache.doris.thrift.TRuntimeProfileTree; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.thrift.TUnit; @@ -245,6 +249,81 @@ public class ExecutionProfile { } } + public Status updateProfile(TQueryProfile profile, TNetworkAddress backendHBAddress) { + if (isPipelineXProfile) { + if (!profile.isSetFragmentIdToProfile()) { + return new Status(TStatusCode.INVALID_ARGUMENT, "FragmentIdToProfile is not set"); + } + + for (Entry<Integer, List<TDetailedReportParams>> entry : profile.getFragmentIdToProfile().entrySet()) { + int fragmentId = entry.getKey(); + List<TDetailedReportParams> fragmentProfile = entry.getValue(); + int pipelineIdx = 0; + List<RuntimeProfile> taskProfile = Lists.newArrayList(); + for (TDetailedReportParams pipelineProfile : fragmentProfile) { + String name = "Pipeline :" + pipelineIdx + " " + + " (host=" + backendHBAddress + ")"; + RuntimeProfile profileNode = new RuntimeProfile(name); + taskProfile.add(profileNode); + if (!pipelineProfile.isSetProfile()) { + return new Status(TStatusCode.INVALID_ARGUMENT, "Profile is not set"); + } + + profileNode.update(pipelineProfile.profile); + pipelineIdx++; + fragmentProfiles.get(fragmentId).addChild(profileNode); + } + multiBeProfile.get(fragmentId).put(backendHBAddress, taskProfile); + } + } else { + if (!profile.isSetInstanceProfiles() || !profile.isSetFragmentInstanceIds()) { + return new Status(TStatusCode.INVALID_ARGUMENT, "InstanceIdToProfile is not set"); + } + + if (profile.fragment_instance_ids.size() != profile.instance_profiles.size()) { + return new Status(TStatusCode.INVALID_ARGUMENT, "InstanceIdToProfile size is not equal"); + } + + for (int idx = 0; idx < profile.getFragmentInstanceIdsSize(); idx++) { + TUniqueId instanceId = profile.getFragmentInstanceIds().get(idx); + TRuntimeProfileTree instanceProfile = profile.getInstanceProfiles().get(idx); + if (instanceProfile == null) { + return new Status(TStatusCode.INVALID_ARGUMENT, "Profile is not set"); + } + + PlanFragmentId fragmentId = instanceIdToFragmentId.get(instanceId); + if (fragmentId == null) { + LOG.warn("Could not find related fragment for instance {}", + DebugUtil.printId(instanceId)); + return new Status(TStatusCode.INVALID_ARGUMENT, "Could not find related fragment"); + } + + // Do not use fragment id in params, because non-pipeline engine will set it to -1 + Map<TUniqueId, RuntimeProfile> instanceProfiles = fragmentInstancesProfiles.get(fragmentId); + if (instanceProfiles == null) { + LOG.warn("Could not find related instances for fragment {}", fragmentId); + return new Status(TStatusCode.INVALID_ARGUMENT, "Could not find related instance"); + } + + RuntimeProfile curInstanceProfile = instanceProfiles.get(instanceId); + if (curInstanceProfile == null) { + LOG.warn("Could not find related profile {}", DebugUtil.printId(instanceId)); + return new Status(TStatusCode.INVALID_ARGUMENT, "Could not find related instance"); + } + + curInstanceProfile.update(instanceProfile); + } + } + + if (profile.isSetLoadChannelProfiles()) { + for (TRuntimeProfileTree loadChannelProfile : profile.getLoadChannelProfiles()) { + this.loadChannelProfile.update(loadChannelProfile); + } + } + + return new Status(TStatusCode.OK, "Success"); + } + public void updateProfile(TReportExecStatusParams params) { Backend backend = null; if (params.isSetBackendId()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index a4cd867a31e..eca02175fcf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -19,6 +19,7 @@ package org.apache.doris.qe; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; +import org.apache.doris.common.Status; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.profile.ExecutionProfile; @@ -26,7 +27,9 @@ import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ProfileManager; import org.apache.doris.metric.MetricRepo; import org.apache.doris.resource.workloadgroup.QueueToken.TokenState; +import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TQueryProfile; import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TReportExecStatusResult; @@ -71,6 +74,27 @@ public final class QeProcessorImpl implements QeProcessor { "profile-write-pool", true); } + private Status processQueryProfile(TQueryProfile profile, TNetworkAddress address) { + LOG.info("New profile processing API, query {}", DebugUtil.printId(profile.query_id)); + + ExecutionProfile executionProfile = ProfileManager.getInstance().getExecutionProfile(profile.query_id); + if (executionProfile == null) { + LOG.warn("Could not find execution profile with query id {}", DebugUtil.printId(profile.query_id)); + return new Status(TStatusCode.NOT_FOUND, "Could not find execution profile with query id " + + DebugUtil.printId(profile.query_id)); + } + + // Update profile may cost a lot of time, use a seperate pool to deal with it. + writeProfileExecutor.submit(new Runnable() { + @Override + public void run() { + executionProfile.updateProfile(profile, address); + } + }); + + return Status.OK; + } + @Override public Coordinator getCoordinator(TUniqueId queryId) { QueryInfo queryInfo = coordinatorMap.get(queryId); @@ -206,6 +230,15 @@ public final class QeProcessorImpl implements QeProcessor { @Override public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, TNetworkAddress beAddr) { + if (params.isSetQueryProfile()) { + if (params.isSetBackendId()) { + Backend backend = Env.getCurrentSystemInfo().getBackend(params.getBackendId()); + if (backend != null) { + processQueryProfile(params.getQueryProfile(), backend.getHeartbeatAddress()); + } + } + } + if (params.isSetProfile() || params.isSetLoadChannelProfile()) { LOG.info("ReportExecStatus(): fragment_instance_id={}, query id={}, backend num: {}, ip: {}", DebugUtil.printId(params.fragment_instance_id), DebugUtil.printId(params.query_id), diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java index eb9ac858b3e..81d444a716a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java @@ -31,6 +31,8 @@ import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TExecPlanFragmentResult; import org.apache.doris.thrift.TExportStatusResult; import org.apache.doris.thrift.TExportTaskRequest; +import org.apache.doris.thrift.TGetRealtimeExecStatusRequest; +import org.apache.doris.thrift.TGetRealtimeExecStatusResponse; import org.apache.doris.thrift.TGetTopNHotPartitionsRequest; import org.apache.doris.thrift.TGetTopNHotPartitionsResponse; import org.apache.doris.thrift.TIngestBinlogRequest; @@ -273,6 +275,12 @@ public class GenericPoolTest { public TWarmUpTabletsResponse warmUpTablets(TWarmUpTabletsRequest request) throws TException { return null; } + + @Override + public TGetRealtimeExecStatusResponse getRealtimeExecStatus(TGetRealtimeExecStatusRequest request) + throws TException { + return null; + } } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index c705893c672..441595511dc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -48,6 +48,8 @@ import org.apache.doris.thrift.TExportState; import org.apache.doris.thrift.TExportStatusResult; import org.apache.doris.thrift.TExportTaskRequest; import org.apache.doris.thrift.TFinishTaskRequest; +import org.apache.doris.thrift.TGetRealtimeExecStatusRequest; +import org.apache.doris.thrift.TGetRealtimeExecStatusResponse; import org.apache.doris.thrift.TGetTopNHotPartitionsRequest; import org.apache.doris.thrift.TGetTopNHotPartitionsResponse; import org.apache.doris.thrift.THeartbeatResult; @@ -456,6 +458,12 @@ public class MockedBackendFactory { throws TException { return null; } + + @Override + public TGetRealtimeExecStatusResponse getRealtimeExecStatus(TGetRealtimeExecStatusRequest request) + throws TException { + return null; + } } // The default Brpc service. diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index fb45ed24809..69918985a16 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -24,6 +24,7 @@ include "PlanNodes.thrift" include "AgentService.thrift" include "PaloInternalService.thrift" include "DorisExternalService.thrift" +include "FrontendService.thrift" struct TExportTaskRequest { 1: required PaloInternalService.TExecPlanFragmentParams params @@ -319,6 +320,16 @@ struct TPublishTopicResult { 1: required Status.TStatus status } +struct TGetRealtimeExecStatusRequest { + // maybe query id or other unique id + 1: optional Types.TUniqueId id +} + +struct TGetRealtimeExecStatusResponse { + 1: optional Status.TStatus status + 2: optional FrontendService.TReportExecStatusParams report_exec_status_params +} + service BackendService { // Called by coord to start asynchronous execution of plan fragment in backend. // Returns as soon as all incoming data streams have been set up. @@ -387,4 +398,6 @@ service BackendService { TQueryIngestBinlogResult query_ingest_binlog(1: TQueryIngestBinlogRequest query_ingest_binlog_request); TPublishTopicResult publish_topic_info(1:TPublishTopicRequest topic_request); + + TGetRealtimeExecStatusResponse get_realtime_exec_status(1:TGetRealtimeExecStatusRequest request); } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 61d34d04184..09cd2bf0be2 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -416,6 +416,20 @@ struct TReportWorkloadRuntimeStatusParams { 2: optional map<string, TQueryStatistics> query_statistics_map } +struct TQueryProfile { + 1: optional Types.TUniqueId query_id + + 2: optional map<i32, list<TDetailedReportParams>> fragment_id_to_profile + + // Types.TUniqueId should not be used as key in thrift map, so we use two lists instead + // https://thrift.apache.org/docs/types#containers + 3: optional list<Types.TUniqueId> fragment_instance_ids + // Types.TUniqueId can not be used as key in thrift map, so we use two lists instead + 4: optional list<RuntimeProfile.TRuntimeProfileTree> instance_profiles + + 5: optional list<RuntimeProfile.TRuntimeProfileTree> load_channel_profiles +} + // The results of an INSERT query, sent to the coordinator as part of // TReportExecStatusParams struct TReportExecStatusParams { @@ -443,7 +457,7 @@ struct TReportExecStatusParams { // cumulative profile // required in V1 // Move to TDetailedReportParams for pipelineX - 7: optional RuntimeProfile.TRuntimeProfileTree profile + 7: optional RuntimeProfile.TRuntimeProfileTree profile // to be deprecated // New errors that have not been reported to the coordinator // optional in V1 @@ -473,17 +487,19 @@ struct TReportExecStatusParams { 20: optional PaloInternalService.TQueryType query_type // Move to TDetailedReportParams for pipelineX - 21: optional RuntimeProfile.TRuntimeProfileTree loadChannelProfile + 21: optional RuntimeProfile.TRuntimeProfileTree loadChannelProfile // to be deprecated 22: optional i32 finished_scan_ranges - 23: optional list<TDetailedReportParams> detailed_report + 23: optional list<TDetailedReportParams> detailed_report // to be deprecated 24: optional TQueryStatistics query_statistics // deprecated 25: optional TReportWorkloadRuntimeStatusParams report_workload_runtime_status 26: optional list<DataSinks.THivePartitionUpdate> hive_partition_updates + + 27: optional TQueryProfile query_profile } struct TFeResult { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org