This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 16e25f6335b [opt](profile) Avoid unnecessary copies in the profile thrift (#34720) 16e25f6335b is described below commit 16e25f6335b19d90ff83bbaa7a5238a1c55b3402 Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Wed May 22 14:23:59 2024 +0800 [opt](profile) Avoid unnecessary copies in the profile thrift (#34720) --- be/src/runtime/fragment_mgr.cpp | 2 +- be/src/runtime/query_context.cpp | 3 ++- be/src/runtime/runtime_query_statistics_mgr.cpp | 33 +++++++++++++------------ be/src/runtime/runtime_query_statistics_mgr.h | 5 ++-- be/src/service/backend_service.cpp | 13 ++++++++-- be/src/util/runtime_profile.cpp | 9 +++---- be/src/util/runtime_profile.h | 2 +- be/src/util/thrift_client.h | 5 ++++ gensrc/thrift/Makefile | 2 +- 9 files changed, 43 insertions(+), 31 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 40013fb33dd..97a8502cfcc 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -281,7 +281,7 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { detailed_param.__isset.profile = true; detailed_param.__isset.loadChannelProfile = false; pipeline_profile->to_thrift(&detailed_param.profile); - params.detailed_report.push_back(detailed_param); + params.detailed_report.push_back(std::move(detailed_param)); } } } else { diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 5360fbe4e4b..4f63f85e231 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -470,7 +470,8 @@ TReportExecStatusParams QueryContext::get_realtime_exec_status_x() const { } exec_status = RuntimeQueryStatiticsMgr::create_report_exec_status_params_x( - this->_query_id, realtime_query_profile, load_channel_profiles, /*is_done=*/false); + this->_query_id, std::move(realtime_query_profile), + std::move(load_channel_profiles), /*is_done=*/false); } else { auto msg = fmt::format("Query {} is not pipelineX query", print_id(_query_id)); LOG_ERROR(msg); diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 46e73940934..55051eff686 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -41,6 +41,7 @@ #include "service/backend_options.h" #include "util/debug_util.h" #include "util/hash_util.hpp" +#include "util/thrift_client.h" #include "util/time.h" #include "util/uid_util.h" #include "vec/core/block.h" @@ -102,10 +103,10 @@ static Status _do_report_exec_stats_rpc(const TNetworkAddress& coor_addr, TReportExecStatusParams RuntimeQueryStatiticsMgr::create_report_exec_status_params_x( const TUniqueId& query_id, - const std::unordered_map<int32, std::vector<std::shared_ptr<TRuntimeProfileTree>>>& + std::unordered_map<int32, std::vector<std::shared_ptr<TRuntimeProfileTree>>> fragment_id_to_profile, - const std::vector<std::shared_ptr<TRuntimeProfileTree>>& load_channel_profiles, - bool is_done) { + std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles, bool is_done) { + // This function will clear the data of fragment_id_to_profile and load_channel_profiles. TQueryProfile profile; profile.__set_query_id(query_id); @@ -126,15 +127,15 @@ TReportExecStatusParams RuntimeQueryStatiticsMgr::create_report_exec_status_para } TDetailedReportParams tmp; - tmp.__set_profile(*pipeline_profile); + THRIFT_MOVE_VALUES(tmp, profile, *pipeline_profile); // tmp.fragment_instance_id is not needed for pipeline x - detailed_params.push_back(tmp); + detailed_params.push_back(std::move(tmp)); } - fragment_id_to_profile_req.insert(std::make_pair(fragment_id, detailed_params)); + fragment_id_to_profile_req[fragment_id] = std::move(detailed_params); } - if (fragment_id_to_profile_req.size() == 0) { + if (fragment_id_to_profile_req.empty()) { LOG_WARNING("No fragment profile found for query {}", print_id(query_id)); } @@ -151,15 +152,15 @@ TReportExecStatusParams RuntimeQueryStatiticsMgr::create_report_exec_status_para continue; } - load_channel_profiles_req.push_back(*load_channel_profile); + load_channel_profiles_req.push_back(std::move(*load_channel_profile)); } - if (load_channel_profiles_req.size() > 0) { - profile.__set_load_channel_profiles(load_channel_profiles_req); + if (!load_channel_profiles_req.empty()) { + THRIFT_MOVE_VALUES(profile, load_channel_profiles, load_channel_profiles_req); } TReportExecStatusParams req; - req.__set_query_profile(profile); + THRIFT_MOVE_VALUES(req, query_profile, profile); req.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id); // invalid query id to avoid API compatibility during upgrade req.__set_query_id(TUniqueId()); @@ -408,10 +409,10 @@ void RuntimeQueryStatiticsMgr::_report_query_profiles_x() { } // query_id -> {coordinator_addr, {fragment_id -> std::vectpr<pipeline_profile>}} - for (const auto& entry : profile_copy) { + for (auto& entry : profile_copy) { const auto& query_id = entry.first; const auto& coor_addr = std::get<0>(entry.second); - const auto& fragment_profile_map = std::get<1>(entry.second); + auto& fragment_profile_map = std::get<1>(entry.second); if (fragment_profile_map.empty()) { auto msg = fmt::format("Query {} does not have profile", print_id(query_id)); @@ -435,13 +436,13 @@ void RuntimeQueryStatiticsMgr::_report_query_profiles_x() { } TReportExecStatusParams req = create_report_exec_status_params_x( - query_id, fragment_profile_map, load_channel_profiles, /*is_done=*/true); + query_id, std::move(fragment_profile_map), std::move(load_channel_profiles), + /*is_done=*/true); TReportExecStatusResult res; auto rpc_status = _do_report_exec_stats_rpc(coor_addr, req, res); - if (res.status.status_code != TStatusCode::OK || - res.status.status_code != TStatusCode::OK) { + if (res.status.status_code != TStatusCode::OK || !rpc_status.ok()) { LOG_WARNING("Query {} send profile to {} failed", print_id(query_id), PrintThriftNetworkAddress(coor_addr)); } else { diff --git a/be/src/runtime/runtime_query_statistics_mgr.h b/be/src/runtime/runtime_query_statistics_mgr.h index d7e473019d8..ff61f665342 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.h +++ b/be/src/runtime/runtime_query_statistics_mgr.h @@ -71,10 +71,9 @@ public: static TReportExecStatusParams create_report_exec_status_params_x( const TUniqueId& q_id, - const std::unordered_map<int32, std::vector<std::shared_ptr<TRuntimeProfileTree>>>& + std::unordered_map<int32, std::vector<std::shared_ptr<TRuntimeProfileTree>>> fragment_id_to_profile, - const std::vector<std::shared_ptr<TRuntimeProfileTree>>& load_channel_profile, - bool is_done); + std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profile, bool is_done); static TReportExecStatusParams create_report_exec_status_params_non_pipeline( const TUniqueId& q_id, diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 296e8b08a5e..de1e1cd9b25 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -24,6 +24,7 @@ #include <gen_cpp/Data_types.h> #include <gen_cpp/DorisExternalService_types.h> #include <gen_cpp/FrontendService_types.h> +#include <gen_cpp/Metrics_types.h> #include <gen_cpp/PaloInternalService_types.h> #include <gen_cpp/Planner_types.h> #include <gen_cpp/Status_types.h> @@ -69,6 +70,7 @@ #include "runtime/stream_load/stream_load_recorder.h" #include "util/arrow/row_batch.h" #include "util/defer_op.h" +#include "util/runtime_profile.h" #include "util/threadpool.h" #include "util/thrift_server.h" #include "util/uid_util.h" @@ -1171,7 +1173,15 @@ void BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse return; } - LOG_INFO("Getting realtime exec status of query {}", print_id(request.id)); + RuntimeProfile::Counter get_realtime_timer {TUnit::TIME_NS}; + + Defer _print_log([&]() { + LOG_INFO("Getting realtime exec status of query {} , cost time {}", print_id(request.id), + PrettyPrinter::print(get_realtime_timer.value(), get_realtime_timer.type())); + }); + + SCOPED_TIMER(&get_realtime_timer); + std::unique_ptr<TReportExecStatusParams> report_exec_status_params = std::make_unique<TReportExecStatusParams>(); Status st = ExecEnv::GetInstance()->fragment_mgr()->get_realtime_exec_status( @@ -1187,7 +1197,6 @@ void BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse response.__set_status(Status::OK().to_thrift()); response.__set_report_exec_status_params(*report_exec_status_params); - return; } } // namespace doris diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index 7c9d40ba3ed..a9e197fba9b 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -589,15 +589,12 @@ void RuntimeProfile::to_thrift(std::vector<TRuntimeProfileNode>* nodes) { if (this->is_set_sink()) { node.__set_is_sink(this->is_sink()); } - CounterMap counter_map; { std::lock_guard<std::mutex> l(_counter_map_lock); - counter_map = _counter_map; node.child_counters_map = _child_counter_map; - } - - for (auto&& [name, counter] : counter_map) { - counter->to_thrift(name, node.counters, node.child_counters_map); + for (auto&& [name, counter] : _counter_map) { + counter->to_thrift(name, node.counters, node.child_counters_map); + } } { diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index c28329fe5da..b77157d1f5b 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -123,7 +123,7 @@ public: counter.value = this->value(); counter.type = this->type(); counter.__set_level(this->level()); - tcounters.push_back(counter); + tcounters.push_back(std::move(counter)); } TUnit::type type() const { return _type; } diff --git a/be/src/util/thrift_client.h b/be/src/util/thrift_client.h index 2328f298450..e60bc32af72 100644 --- a/be/src/util/thrift_client.h +++ b/be/src/util/thrift_client.h @@ -38,6 +38,11 @@ class TTransport; } // namespace apache namespace doris { + +#define THRIFT_MOVE_VALUES(thrift, member, value) \ + thrift.__isset.member = true; \ + thrift.member = std::move(value); + // Super class for templatized thrift clients. class ThriftClientImpl { public: diff --git a/gensrc/thrift/Makefile b/gensrc/thrift/Makefile index e2d81952d54..bc30124bd81 100644 --- a/gensrc/thrift/Makefile +++ b/gensrc/thrift/Makefile @@ -31,7 +31,7 @@ all: ${GEN_OBJECTS} ${OBJECTS} $(shell mkdir -p ${BUILD_DIR}/gen_java) -THRIFT_CPP_ARGS = -I ${CURDIR} -I ${BUILD_DIR}/thrift/ --gen cpp -out ${BUILD_DIR}/gen_cpp --allow-64bit-consts -strict +THRIFT_CPP_ARGS = -I ${CURDIR} -I ${BUILD_DIR}/thrift/ --gen cpp:moveable_types -out ${BUILD_DIR}/gen_cpp --allow-64bit-consts -strict THRIFT_JAVA_ARGS = -I ${CURDIR} -I ${BUILD_DIR}/thrift/ --gen java:fullcamel -out ${BUILD_DIR}/gen_java --allow-64bit-consts -strict ${BUILD_DIR}/gen_cpp: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org