This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit ad986a78ae141914f8cf8252fd109c0c9a31aee6 Author: wangbo <wan...@apache.org> AuthorDate: Thu Jan 11 20:43:47 2024 +0800 [Fix](executor)Fix Grayscale upgrade be code dump when report statistics #29843 --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 1 + be/src/runtime/runtime_query_statistics_mgr.cpp | 40 +++++++++++++++++++------ be/src/runtime/runtime_query_statistics_mgr.h | 3 +- 4 files changed, 36 insertions(+), 10 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 89ef077f886..9841b7e38df 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1167,6 +1167,8 @@ DEFINE_mInt64(enable_debug_log_timeout_secs, "0"); DEFINE_Int32(ignore_invalid_partition_id_rowset_num, "0"); DEFINE_mInt32(report_query_statistics_interval_ms, "3000"); +// 30s +DEFINE_mInt32(query_statistics_reserve_timeout_ms, "30000"); // clang-format off #ifdef BE_TEST diff --git a/be/src/common/config.h b/be/src/common/config.h index 750c75ab5f3..8e0403dab27 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1244,6 +1244,7 @@ DECLARE_mBool(enable_column_type_check); DECLARE_Int32(ignore_invalid_partition_id_rowset_num); DECLARE_mInt32(report_query_statistics_interval_ms); +DECLARE_mInt32(query_statistics_reserve_timeout_ms); #ifdef BE_TEST // test s3 diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 6a8aa3f5097..a9d1226b0d0 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -20,6 +20,7 @@ #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "util/debug_util.h" +#include "util/time.h" namespace doris { @@ -37,9 +38,11 @@ void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() { int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id; // 1 get query statistics map std::map<TNetworkAddress, std::map<std::string, TQueryStatistics>> fe_qs_map; - std::map<std::string, bool> query_finished; + std::map<std::string, std::pair<bool, bool>> qs_status; // <finished, timeout> { std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock); + int64_t current_time = MonotonicMillis(); + int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms; for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) { if (fe_qs_map.find(qs_ctx_ptr->fe_addr) == fe_qs_map.end()) { std::map<std::string, TQueryStatistics> tmp_map; @@ -53,7 +56,14 @@ void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() { TQueryStatistics ret_t_qs; tmp_qs.to_thrift(&ret_t_qs); fe_qs_map.at(qs_ctx_ptr->fe_addr)[query_id] = ret_t_qs; - query_finished[query_id] = qs_ctx_ptr->is_query_finished; + + bool is_query_finished = qs_ctx_ptr->is_query_finished; + bool is_timeout_after_finish = false; + if (is_query_finished) { + is_timeout_after_finish = + (current_time - qs_ctx_ptr->query_finish_time) > conf_qs_timeout; + } + qs_status[query_id] = std::make_pair(is_query_finished, is_timeout_after_finish); } } @@ -87,8 +97,12 @@ void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() { try { coord->reportExecStatus(res, params); rpc_result[addr] = true; + } catch (apache::thrift::TApplicationException& e) { + LOG(WARNING) << "fe " << add_str + << " throw exception when report statistics, reason=" << e.what() + << " , you can see fe log for details."; } catch (apache::thrift::transport::TTransportException& e) { - LOG(WARNING) << "report workload runtime stats to " << add_str + LOG(WARNING) << "report workload runtime statistics to " << add_str << " failed, err: " << e.what(); rpc_status = coord.reopen(); if (!rpc_status.ok()) { @@ -108,14 +122,20 @@ void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() { } // 3 when query is finished and (last rpc is send success), remove finished query statistics + if (fe_qs_map.size() == 0) { + return; + } + { std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock); for (auto& [addr, qs_map] : fe_qs_map) { - if (rpc_result[addr]) { - for (auto& [query_id, qs] : qs_map) { - if (query_finished[query_id]) { - _query_statistics_ctx_map.erase(query_id); - } + bool is_rpc_success = rpc_result[addr]; + for (auto& [query_id, qs] : qs_map) { + auto& qs_status_pair = qs_status[query_id]; + bool is_query_finished = qs_status_pair.first; + bool is_timeout_after_finish = qs_status_pair.second; + if ((is_rpc_success && is_query_finished) || is_timeout_after_finish) { + _query_statistics_ctx_map.erase(query_id); } } } @@ -128,7 +148,9 @@ void RuntimeQueryStatiticsMgr::set_query_finished(std::string query_id) { // when a query get query_ctx succ, but failed before create node/operator, // it may not register query statistics, so it can not be mark finish if (_query_statistics_ctx_map.find(query_id) != _query_statistics_ctx_map.end()) { - _query_statistics_ctx_map.at(query_id)->is_query_finished = true; + auto* qs_ptr = _query_statistics_ctx_map.at(query_id).get(); + qs_ptr->is_query_finished = true; + qs_ptr->query_finish_time = MonotonicMillis(); } } diff --git a/be/src/runtime/runtime_query_statistics_mgr.h b/be/src/runtime/runtime_query_statistics_mgr.h index b3fa4bbc408..c4e997d9ffb 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.h +++ b/be/src/runtime/runtime_query_statistics_mgr.h @@ -32,8 +32,9 @@ public: ~QueryStatisticsCtx() = default; std::vector<std::shared_ptr<QueryStatistics>> qs_list; - std::atomic<bool> is_query_finished; + bool is_query_finished; TNetworkAddress fe_addr; + int64_t query_finish_time; }; class RuntimeQueryStatiticsMgr { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org