This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 6c117fe5c2f branch-3.0 pick 49325 fix query statistics leak in BE 
(#49401)
6c117fe5c2f is described below

commit 6c117fe5c2fba4bb4f8a84473a8875c36d31f597
Author: wangbo <wan...@selectdb.com>
AuthorDate: Tue Mar 25 12:22:28 2025 +0800

    branch-3.0 pick 49325 fix query statistics leak in BE (#49401)
    
    pick #49325
---
 be/src/runtime/runtime_query_statistics_mgr.cpp | 44 +++++++++++++++----------
 1 file changed, 27 insertions(+), 17 deletions(-)

diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index ebcaf30eab1..0d3c976fedd 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -56,7 +56,7 @@ static Status _do_report_exec_stats_rpc(const 
TNetworkAddress& coor_addr,
                                         TReportExecStatusResult& res) {
     Status client_status;
     FrontendServiceConnection 
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr,
-                                         &client_status);
+                                         config::thrift_rpc_timeout_ms, 
&client_status);
     if (!client_status.ok()) {
         LOG_WARNING(
                 "Could not get client rpc client of {} when reporting 
profiles, reason is {}, "
@@ -354,26 +354,36 @@ void 
RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
         std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
         int64_t current_time = MonotonicMillis();
         int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms;
-        for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
-            if (qs_ctx_ptr->_query_type == TQueryType::EXTERNAL) {
-                continue;
-            }
-            if (fe_qs_map.find(qs_ctx_ptr->_fe_addr) == fe_qs_map.end()) {
-                std::map<std::string, TQueryStatistics> tmp_map;
-                fe_qs_map[qs_ctx_ptr->_fe_addr] = std::move(tmp_map);
-            }
-
-            TQueryStatistics ret_t_qs;
-            qs_ctx_ptr->collect_query_statistics(&ret_t_qs);
-            fe_qs_map.at(qs_ctx_ptr->_fe_addr)[query_id] = ret_t_qs;
-
+        for (auto iter = _query_statistics_ctx_map.begin();
+             iter != _query_statistics_ctx_map.end();) {
+            std::string query_id = iter->first;
+            auto& qs_ctx_ptr = iter->second;
             bool is_query_finished = qs_ctx_ptr->_is_query_finished;
             bool is_timeout_after_finish = false;
             if (is_query_finished) {
                 is_timeout_after_finish =
                         (current_time - qs_ctx_ptr->_query_finish_time) > 
conf_qs_timeout;
             }
-            qs_status[query_id] = std::make_pair(is_query_finished, 
is_timeout_after_finish);
+
+            // external query not need to report to FE, so we can remove it 
directly.
+            if (qs_ctx_ptr->_query_type == TQueryType::EXTERNAL && 
is_query_finished) {
+                iter = _query_statistics_ctx_map.erase(iter);
+            } else {
+                if (qs_ctx_ptr->_query_type != TQueryType::EXTERNAL) {
+                    if (fe_qs_map.find(qs_ctx_ptr->_fe_addr) == 
fe_qs_map.end()) {
+                        std::map<std::string, TQueryStatistics> tmp_map;
+                        fe_qs_map[qs_ctx_ptr->_fe_addr] = std::move(tmp_map);
+                    }
+
+                    TQueryStatistics ret_t_qs;
+                    qs_ctx_ptr->collect_query_statistics(&ret_t_qs);
+                    fe_qs_map.at(qs_ctx_ptr->_fe_addr)[query_id] = ret_t_qs;
+                    qs_status[query_id] =
+                            std::make_pair(is_query_finished, 
is_timeout_after_finish);
+                }
+
+                iter++;
+            }
         }
     }
 
@@ -384,7 +394,7 @@ void 
RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
         // 2.1 get client
         Status coord_status;
         FrontendServiceConnection 
coord(ExecEnv::GetInstance()->frontend_client_cache(), addr,
-                                        &coord_status);
+                                        config::thrift_rpc_timeout_ms, 
&coord_status);
         std::string add_str = PrintThriftNetworkAddress(addr);
         if (!coord_status.ok()) {
             std::stringstream ss;
@@ -414,7 +424,7 @@ void 
RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
         } catch (apache::thrift::transport::TTransportException& e) {
             LOG(WARNING) << "[report_query_statistics]report workload runtime 
statistics to "
                          << add_str << " failed,  reason: " << e.what();
-            rpc_status = coord.reopen();
+            rpc_status = coord.reopen(config::thrift_rpc_timeout_ms);
             if (!rpc_status.ok()) {
                 LOG(WARNING) << "[report_query_statistics]reopen thrift client 
failed when report "
                                 "workload runtime statistics to"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to