This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ad986a78ae141914f8cf8252fd109c0c9a31aee6
Author: wangbo <wan...@apache.org>
AuthorDate: Thu Jan 11 20:43:47 2024 +0800

    [Fix](executor)Fix Grayscale upgrade be code dump when report statistics 
#29843
---
 be/src/common/config.cpp                        |  2 ++
 be/src/common/config.h                          |  1 +
 be/src/runtime/runtime_query_statistics_mgr.cpp | 40 +++++++++++++++++++------
 be/src/runtime/runtime_query_statistics_mgr.h   |  3 +-
 4 files changed, 36 insertions(+), 10 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 89ef077f886..9841b7e38df 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1167,6 +1167,8 @@ DEFINE_mInt64(enable_debug_log_timeout_secs, "0");
 DEFINE_Int32(ignore_invalid_partition_id_rowset_num, "0");
 
 DEFINE_mInt32(report_query_statistics_interval_ms, "3000");
+// 30s
+DEFINE_mInt32(query_statistics_reserve_timeout_ms, "30000");
 
 // clang-format off
 #ifdef BE_TEST
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 750c75ab5f3..8e0403dab27 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1244,6 +1244,7 @@ DECLARE_mBool(enable_column_type_check);
 DECLARE_Int32(ignore_invalid_partition_id_rowset_num);
 
 DECLARE_mInt32(report_query_statistics_interval_ms);
+DECLARE_mInt32(query_statistics_reserve_timeout_ms);
 
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 6a8aa3f5097..a9d1226b0d0 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -20,6 +20,7 @@
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
 #include "util/debug_util.h"
+#include "util/time.h"
 
 namespace doris {
 
@@ -37,9 +38,11 @@ void 
RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
     int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
     // 1 get query statistics map
     std::map<TNetworkAddress, std::map<std::string, TQueryStatistics>> 
fe_qs_map;
-    std::map<std::string, bool> query_finished;
+    std::map<std::string, std::pair<bool, bool>> qs_status; // <finished, 
timeout>
     {
         std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
+        int64_t current_time = MonotonicMillis();
+        int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms;
         for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
             if (fe_qs_map.find(qs_ctx_ptr->fe_addr) == fe_qs_map.end()) {
                 std::map<std::string, TQueryStatistics> tmp_map;
@@ -53,7 +56,14 @@ void 
RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
             TQueryStatistics ret_t_qs;
             tmp_qs.to_thrift(&ret_t_qs);
             fe_qs_map.at(qs_ctx_ptr->fe_addr)[query_id] = ret_t_qs;
-            query_finished[query_id] = qs_ctx_ptr->is_query_finished;
+
+            bool is_query_finished = qs_ctx_ptr->is_query_finished;
+            bool is_timeout_after_finish = false;
+            if (is_query_finished) {
+                is_timeout_after_finish =
+                        (current_time - qs_ctx_ptr->query_finish_time) > 
conf_qs_timeout;
+            }
+            qs_status[query_id] = std::make_pair(is_query_finished, 
is_timeout_after_finish);
         }
     }
 
@@ -87,8 +97,12 @@ void 
RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
         try {
             coord->reportExecStatus(res, params);
             rpc_result[addr] = true;
+        } catch (apache::thrift::TApplicationException& e) {
+            LOG(WARNING) << "fe " << add_str
+                         << " throw exception when report statistics, reason=" 
<< e.what()
+                         << " , you can see fe log for details.";
         } catch (apache::thrift::transport::TTransportException& e) {
-            LOG(WARNING) << "report workload runtime stats to " << add_str
+            LOG(WARNING) << "report workload runtime statistics to " << add_str
                          << " failed,  err: " << e.what();
             rpc_status = coord.reopen();
             if (!rpc_status.ok()) {
@@ -108,14 +122,20 @@ void 
RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
     }
 
     //  3 when query is finished and (last rpc is send success), remove 
finished query statistics
+    if (fe_qs_map.size() == 0) {
+        return;
+    }
+
     {
         std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
         for (auto& [addr, qs_map] : fe_qs_map) {
-            if (rpc_result[addr]) {
-                for (auto& [query_id, qs] : qs_map) {
-                    if (query_finished[query_id]) {
-                        _query_statistics_ctx_map.erase(query_id);
-                    }
+            bool is_rpc_success = rpc_result[addr];
+            for (auto& [query_id, qs] : qs_map) {
+                auto& qs_status_pair = qs_status[query_id];
+                bool is_query_finished = qs_status_pair.first;
+                bool is_timeout_after_finish = qs_status_pair.second;
+                if ((is_rpc_success && is_query_finished) || 
is_timeout_after_finish) {
+                    _query_statistics_ctx_map.erase(query_id);
                 }
             }
         }
@@ -128,7 +148,9 @@ void 
RuntimeQueryStatiticsMgr::set_query_finished(std::string query_id) {
     // when a query get query_ctx succ, but failed before create node/operator,
     // it may not register query statistics, so it can not be mark finish
     if (_query_statistics_ctx_map.find(query_id) != 
_query_statistics_ctx_map.end()) {
-        _query_statistics_ctx_map.at(query_id)->is_query_finished = true;
+        auto* qs_ptr = _query_statistics_ctx_map.at(query_id).get();
+        qs_ptr->is_query_finished = true;
+        qs_ptr->query_finish_time = MonotonicMillis();
     }
 }
 
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h 
b/be/src/runtime/runtime_query_statistics_mgr.h
index b3fa4bbc408..c4e997d9ffb 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -32,8 +32,9 @@ public:
     ~QueryStatisticsCtx() = default;
 
     std::vector<std::shared_ptr<QueryStatistics>> qs_list;
-    std::atomic<bool> is_query_finished;
+    bool is_query_finished;
     TNetworkAddress fe_addr;
+    int64_t query_finish_time;
 };
 
 class RuntimeQueryStatiticsMgr {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to