This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 9272c650b4d [Refactor](query) refactor lock in fragment mgr and change 
std::unorder_map to phmap (#45069)
9272c650b4d is described below

commit 9272c650b4dbb4781d010f4f68afe721a42fbc0d
Author: HappenLee <happen...@selectdb.com>
AuthorDate: Thu Dec 19 22:27:33 2024 +0800

    [Refactor](query) refactor lock in fragment mgr and change std::unorder_map 
to phmap (#45069)
    
    ### What problem does this PR solve?
    
    Related PR: #44821
---
 be/src/runtime/fragment_mgr.cpp | 71 +++++++++++++++++++++++++++--------------
 be/src/runtime/fragment_mgr.h   |  5 +--
 2 files changed, 50 insertions(+), 26 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index f43190ebb36..b7bbaf8f206 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -271,12 +271,17 @@ void FragmentMgr::stop() {
     {
         std::lock_guard<std::mutex> lock(_lock);
         _fragment_instance_map.clear();
-        _query_ctx_map.clear();
         for (auto& pipeline : _pipeline_map) {
             pipeline.second->close_sink();
         }
         _pipeline_map.clear();
     }
+
+    {
+        std::unique_lock lock(_query_ctx_map_lock);
+        _query_ctx_map.clear();
+    }
+
     _async_report_thread_pool->shutdown();
 }
 
@@ -620,11 +625,11 @@ void 
FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
         
_fragment_instance_map.erase(fragment_executor->fragment_instance_id());
 
         LOG_INFO("Instance {} finished", 
print_id(fragment_executor->fragment_instance_id()));
-
-        if (all_done && query_ctx) {
-            _query_ctx_map.erase(query_ctx->query_id());
-            LOG_INFO("Query {} finished", print_id(query_ctx->query_id()));
-        }
+    }
+    if (all_done && query_ctx) {
+        std::unique_lock lock(_query_ctx_map_lock);
+        _query_ctx_map.erase(query_ctx->query_id());
+        LOG_INFO("Query {} finished", print_id(query_ctx->query_id()));
     }
 
     // Callback after remove from this id
@@ -713,8 +718,10 @@ Status FragmentMgr::start_query_execution(const 
PExecPlanFragmentStartRequest* r
     query_id.__set_lo(request->query_id().lo());
     std::shared_ptr<QueryContext> q_ctx = nullptr;
     {
-        std::lock_guard<std::mutex> lock(_lock);
-
+        TUniqueId query_id;
+        query_id.__set_hi(request->query_id().hi());
+        query_id.__set_lo(request->query_id().lo());
+        std::shared_lock lock(_query_ctx_map_lock);
         auto search = _query_ctx_map.find(query_id);
         if (search == _query_ctx_map.end()) {
             return Status::InternalError(
@@ -732,22 +739,24 @@ Status FragmentMgr::start_query_execution(const 
PExecPlanFragmentStartRequest* r
 void FragmentMgr::remove_pipeline_context(
         std::shared_ptr<pipeline::PipelineFragmentContext> f_context) {
     auto* q_context = f_context->get_query_ctx();
+    bool all_done = false;
+    TUniqueId query_id = f_context->get_query_id();
     {
         std::lock_guard<std::mutex> lock(_lock);
-        auto query_id = f_context->get_query_id();
         std::vector<TUniqueId> ins_ids;
         f_context->instance_ids(ins_ids);
-        bool all_done = q_context->countdown(ins_ids.size());
+        all_done = q_context->countdown(ins_ids.size());
         for (const auto& ins_id : ins_ids) {
             LOG_INFO("Removing query {} instance {}, all done? {}", 
print_id(query_id),
                      print_id(ins_id), all_done);
             _pipeline_map.erase(ins_id);
             g_pipeline_fragment_instances_count << -1;
         }
-        if (all_done) {
-            LOG_INFO("Query {} finished", print_id(query_id));
-            _query_ctx_map.erase(query_id);
-        }
+    }
+    if (all_done) {
+        std::unique_lock lock(_query_ctx_map_lock);
+        _query_ctx_map.erase(query_id);
+        LOG_INFO("Query {} finished", print_id(query_id));
     }
 }
 
@@ -759,7 +768,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
                     { return 
Status::InternalError("FragmentMgr._get_query_ctx.failed"); });
     if (params.is_simplified_param) {
         // Get common components from _query_ctx_map
-        std::lock_guard<std::mutex> lock(_lock);
+        std::shared_lock lock(_query_ctx_map_lock);
         auto search = _query_ctx_map.find(query_id);
         if (search == _query_ctx_map.end()) {
             return Status::InternalError(
@@ -771,7 +780,16 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
     } else {
         // Find _query_ctx_map, in case some other request has already
         // create the query fragments context.
-        std::lock_guard<std::mutex> lock(_lock);
+        {
+            std::shared_lock lock(_query_ctx_map_lock);
+            auto search = _query_ctx_map.find(query_id);
+            if (search != _query_ctx_map.end()) {
+                query_ctx = search->second;
+                return Status::OK();
+            }
+        }
+
+        std::unique_lock lock(_query_ctx_map_lock);
         auto search = _query_ctx_map.find(query_id);
         if (search != _query_ctx_map.end()) {
             query_ctx = search->second;
@@ -1170,7 +1188,7 @@ void FragmentMgr::_set_scan_concurrency(const Param& 
params, QueryContext* query
 }
 
 std::shared_ptr<QueryContext> FragmentMgr::get_query_context(const TUniqueId& 
query_id) {
-    std::lock_guard<std::mutex> state_lock(_lock);
+    std::shared_lock lock(_query_ctx_map_lock);
     auto ctx = _query_ctx_map.find(query_id);
     if (ctx != _query_ctx_map.end()) {
         return ctx->second;
@@ -1184,7 +1202,7 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id, 
const PPlanFragmentCan
     std::shared_ptr<QueryContext> query_ctx;
     std::vector<TUniqueId> all_instance_ids;
     {
-        std::lock_guard<std::mutex> state_lock(_lock);
+        std::shared_lock lock(_query_ctx_map_lock);
         auto ctx_iter = _query_ctx_map.find(query_id);
 
         if (ctx_iter == _query_ctx_map.end()) {
@@ -1251,7 +1269,7 @@ void FragmentMgr::cancel_instance(const TUniqueId& 
instance_id,
 
 void FragmentMgr::cancel_fragment(const TUniqueId& query_id, int32_t 
fragment_id,
                                   const PPlanFragmentCancelReason& reason, 
const std::string& msg) {
-    std::unique_lock<std::mutex> lock(_lock);
+    std::shared_lock lock(_query_ctx_map_lock);
     auto q_ctx_iter = _query_ctx_map.find(query_id);
     if (q_ctx_iter != _query_ctx_map.end()) {
         // Has to use value to keep the shared ptr not deconstructed.
@@ -1315,6 +1333,9 @@ void FragmentMgr::cancel_worker() {
                     pipeline_itr.second->clear_finished_tasks();
                 }
             }
+        }
+        {
+            std::unique_lock lock(_query_ctx_map_lock);
             for (auto it = _query_ctx_map.begin(); it != 
_query_ctx_map.end();) {
                 if (it->second->is_timeout(now)) {
                     LOG_WARNING("Query {} is timeout", print_id(it->first));
@@ -1335,7 +1356,9 @@ void FragmentMgr::cancel_worker() {
                     ++it;
                 }
             }
-
+        }
+        {
+            std::shared_lock lock(_query_ctx_map_lock);
             // We use a very conservative cancel strategy.
             // 0. If there are no running frontends, do not cancel any queries.
             // 1. If query's process uuid is zero, do not cancel
@@ -1773,7 +1796,7 @@ Status FragmentMgr::send_filter_size(const 
PSendFilterSizeRequest* request) {
         TUniqueId query_id;
         query_id.__set_hi(queryid.hi);
         query_id.__set_lo(queryid.lo);
-        std::lock_guard<std::mutex> lock(_lock);
+        std::shared_lock lock(_query_ctx_map_lock);
         auto iter = _query_ctx_map.find(query_id);
         if (iter == _query_ctx_map.end()) {
             return Status::EndOfFile("Query context (query-id: {}) not found, 
maybe finished",
@@ -1796,7 +1819,7 @@ Status FragmentMgr::sync_filter_size(const 
PSyncFilterSizeRequest* request) {
         TUniqueId query_id;
         query_id.__set_hi(queryid.hi);
         query_id.__set_lo(queryid.lo);
-        std::lock_guard<std::mutex> lock(_lock);
+        std::shared_lock lock(_query_ctx_map_lock);
         auto iter = _query_ctx_map.find(query_id);
         if (iter == _query_ctx_map.end()) {
             return Status::InvalidArgument("query-id: {}", 
queryid.to_string());
@@ -1819,7 +1842,7 @@ Status FragmentMgr::merge_filter(const 
PMergeFilterRequest* request,
         TUniqueId query_id;
         query_id.__set_hi(queryid.hi);
         query_id.__set_lo(queryid.lo);
-        std::lock_guard<std::mutex> lock(_lock);
+        std::shared_lock lock(_query_ctx_map_lock);
         auto iter = _query_ctx_map.find(query_id);
         if (iter == _query_ctx_map.end()) {
             return Status::InvalidArgument("query-id: {}", 
queryid.to_string());
@@ -1914,7 +1937,7 @@ void 
FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFrag
 
 void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* 
query_info_list) {
     {
-        std::lock_guard<std::mutex> lock(_lock);
+        std::shared_lock lock(_query_ctx_map_lock);
         for (const auto& q : _query_ctx_map) {
             WorkloadQueryInfo workload_query_info;
             workload_query_info.query_id = print_id(q.first);
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 0c1bb3033d9..53cea30686f 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -143,7 +143,7 @@ public:
     std::shared_ptr<QueryContext> get_query_context(const TUniqueId& query_id);
 
     int32_t running_query_num() {
-        std::unique_lock<std::mutex> ctx_lock(_lock);
+        std::shared_lock ctx_lock(_query_ctx_map_lock);
         return _query_ctx_map.size();
     }
 
@@ -201,8 +201,9 @@ private:
 
     std::unordered_map<TUniqueId, 
std::shared_ptr<pipeline::PipelineFragmentContext>> _pipeline_map;
 
+    std::shared_mutex _query_ctx_map_lock;
     // query id -> QueryContext
-    std::unordered_map<TUniqueId, std::shared_ptr<QueryContext>> 
_query_ctx_map;
+    phmap::flat_hash_map<TUniqueId, std::shared_ptr<QueryContext>> 
_query_ctx_map;
     std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>> 
_bf_size_map;
 
     CountDownLatch _stop_background_threads_latch;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to