This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 1b47296153e [Refactor](query) refactor lock in fragment mgr and change 
std::unorder_map to phmap (#45058)
1b47296153e is described below

commit 1b47296153ee7e954bf3ee2dfd926be1ab3ee204
Author: HappenLee <happen...@selectdb.com>
AuthorDate: Tue Dec 10 15:39:09 2024 +0800

    [Refactor](query) refactor lock in fragment mgr and change std::unorder_map 
to phmap (#45058)
    
    ### What problem does this PR solve?
    
    Cherry pick #44821
    
    Related PR: #44821
---
 be/src/runtime/fragment_mgr.cpp  | 249 ++++++++++++++++++---------------------
 be/src/runtime/fragment_mgr.h    |  32 +++--
 be/src/runtime/load_channel.cpp  |   3 +-
 be/src/runtime/load_stream.cpp   |   2 +-
 be/src/runtime/runtime_state.cpp |   2 +-
 5 files changed, 138 insertions(+), 150 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 337a7aa41fc..0f29d1de6a6 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -267,8 +267,11 @@ void FragmentMgr::stop() {
 
     // Only me can delete
     {
-        std::lock_guard<std::mutex> lock(_lock);
+        std::unique_lock lock(_query_ctx_map_mutex);
         _query_ctx_map.clear();
+    }
+    {
+        std::unique_lock lock(_pipeline_map_mutex);
         _pipeline_map.clear();
     }
     _thread_pool->shutdown();
@@ -620,11 +623,7 @@ Status FragmentMgr::start_query_execution(const 
PExecPlanFragmentStartRequest* r
     TUniqueId query_id;
     query_id.__set_hi(request->query_id().hi());
     query_id.__set_lo(request->query_id().lo());
-    std::shared_ptr<QueryContext> q_ctx = nullptr;
-    {
-        std::lock_guard<std::mutex> lock(_lock);
-        q_ctx = _get_or_erase_query_ctx(query_id);
-    }
+    auto q_ctx = get_query_ctx(query_id);
     if (q_ctx) {
         q_ctx->set_ready_to_execute(Status::OK());
         LOG_INFO("Query {} start execution", print_id(query_id));
@@ -639,116 +638,110 @@ Status FragmentMgr::start_query_execution(const 
PExecPlanFragmentStartRequest* r
 
 void FragmentMgr::remove_pipeline_context(
         std::shared_ptr<pipeline::PipelineFragmentContext> f_context) {
-    {
-        std::lock_guard<std::mutex> lock(_lock);
-        auto query_id = f_context->get_query_id();
-        int64 now = duration_cast<std::chrono::milliseconds>(
-                            
std::chrono::system_clock::now().time_since_epoch())
-                            .count();
-        g_fragment_executing_count << -1;
-        g_fragment_last_active_time.set_value(now);
-        // this log will show when a query is really finished in BEs
-        LOG_INFO("Removing query {} fragment {}", print_id(query_id), 
f_context->get_fragment_id());
-        _pipeline_map.erase({query_id, f_context->get_fragment_id()});
-    }
+    auto query_id = f_context->get_query_id();
+    int64 now = duration_cast<std::chrono::milliseconds>(
+                        std::chrono::system_clock::now().time_since_epoch())
+                        .count();
+    g_fragment_executing_count << -1;
+    g_fragment_last_active_time.set_value(now);
+
+    // this log will show when a query is really finished in BEs
+    LOG_INFO("Removing query {} fragment {}", print_id(query_id), 
f_context->get_fragment_id());
+
+    std::unique_lock lock(_pipeline_map_mutex);
+    _pipeline_map.erase({query_id, f_context->get_fragment_id()});
 }
 
-std::shared_ptr<QueryContext> FragmentMgr::_get_or_erase_query_ctx(const 
TUniqueId& query_id) {
+std::shared_ptr<QueryContext> FragmentMgr::get_query_ctx(const TUniqueId& 
query_id) {
+    std::shared_lock lock(_query_ctx_map_mutex);
     auto search = _query_ctx_map.find(query_id);
     if (search != _query_ctx_map.end()) {
         if (auto q_ctx = search->second.lock()) {
             return q_ctx;
-        } else {
-            LOG(WARNING) << "Query context (query id = " << print_id(query_id)
-                         << ") has been released.";
-            _query_ctx_map.erase(search);
-            return nullptr;
         }
     }
     return nullptr;
 }
 
-std::shared_ptr<QueryContext> FragmentMgr::get_or_erase_query_ctx_with_lock(
-        const TUniqueId& query_id) {
-    std::unique_lock<std::mutex> lock(_lock);
-    return _get_or_erase_query_ctx(query_id);
-}
-
-template <typename Params>
-Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, 
bool pipeline,
-                                   QuerySource query_source,
-                                   std::shared_ptr<QueryContext>& query_ctx) {
+Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& 
params,
+                                             TUniqueId query_id, bool pipeline,
+                                             QuerySource query_source,
+                                             std::shared_ptr<QueryContext>& 
query_ctx) {
     DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed", {
         return Status::InternalError("FragmentMgr._get_query_ctx.failed, query 
id {}",
                                      print_id(query_id));
     });
+
+    // Find _query_ctx_map, in case some other request has already
+    // create the query fragments context.
+    query_ctx = get_query_ctx(query_id);
     if (params.is_simplified_param) {
         // Get common components from _query_ctx_map
-        std::lock_guard<std::mutex> lock(_lock);
-        if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
-            query_ctx = q_ctx;
-        } else {
+        if (!query_ctx) {
             return Status::InternalError(
                     "Failed to get query fragments context. Query {} may be 
timeout or be "
                     "cancelled. host: {}",
                     print_id(query_id), BackendOptions::get_localhost());
         }
     } else {
-        // Find _query_ctx_map, in case some other request has already
-        // create the query fragments context.
-        std::lock_guard<std::mutex> lock(_lock);
-        if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
-            query_ctx = q_ctx;
-            return Status::OK();
-        }
+        if (!query_ctx) {
+            std::unique_lock lock(_query_ctx_map_mutex);
+            // Only one thread need create query ctx. other thread just get 
query_ctx in _query_ctx_map.
+            auto search = _query_ctx_map.find(query_id);
+            if (search != _query_ctx_map.end()) {
+                query_ctx = search->second.lock();
+            }
 
-        // First time a fragment of a query arrived. print logs.
-        LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " << 
params.coord
-                  << ", total fragment num on current host: " << 
params.fragment_num_on_host
-                  << ", fe process uuid: " << 
params.query_options.fe_process_uuid
-                  << ", query type: " << params.query_options.query_type
-                  << ", report audit fe:" << params.current_connect_fe;
-
-        // This may be a first fragment request of the query.
-        // Create the query fragments context.
-        query_ctx = QueryContext::create_shared(query_id, _exec_env, 
params.query_options,
-                                                params.coord, pipeline, 
params.is_nereids,
-                                                params.current_connect_fe, 
query_source);
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
-        RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), 
params.desc_tbl,
-                                              &(query_ctx->desc_tbl)));
-        // set file scan range params
-        if (params.__isset.file_scan_params) {
-            query_ctx->file_scan_range_params_map = params.file_scan_params;
-        }
+            if (!query_ctx) {
+                // First time a fragment of a query arrived. print logs.
+                LOG(INFO) << "query_id: " << print_id(query_id) << ", 
coord_addr: " << params.coord
+                          << ", total fragment num on current host: " << 
params.fragment_num_on_host
+                          << ", fe process uuid: " << 
params.query_options.fe_process_uuid
+                          << ", query type: " << 
params.query_options.query_type
+                          << ", report audit fe:" << params.current_connect_fe;
+
+                // This may be a first fragment request of the query.
+                // Create the query fragments context.
+                query_ctx = QueryContext::create_shared(query_id, _exec_env, 
params.query_options,
+                                                        params.coord, 
pipeline, params.is_nereids,
+                                                        
params.current_connect_fe, query_source);
+                
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
+                RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), 
params.desc_tbl,
+                                                      &(query_ctx->desc_tbl)));
+                // set file scan range params
+                if (params.__isset.file_scan_params) {
+                    query_ctx->file_scan_range_params_map = 
params.file_scan_params;
+                }
 
-        query_ctx->query_globals = params.query_globals;
+                query_ctx->query_globals = params.query_globals;
 
-        if (params.__isset.resource_info) {
-            query_ctx->user = params.resource_info.user;
-            query_ctx->group = params.resource_info.group;
-            query_ctx->set_rsc_info = true;
-        }
+                if (params.__isset.resource_info) {
+                    query_ctx->user = params.resource_info.user;
+                    query_ctx->group = params.resource_info.group;
+                    query_ctx->set_rsc_info = true;
+                }
 
-        _set_scan_concurrency(params, query_ctx.get());
-
-        if (params.__isset.workload_groups && !params.workload_groups.empty()) 
{
-            uint64_t tg_id = params.workload_groups[0].id;
-            WorkloadGroupPtr workload_group_ptr =
-                    
_exec_env->workload_group_mgr()->get_task_group_by_id(tg_id);
-            if (workload_group_ptr != nullptr) {
-                RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, 
query_ctx));
-                
RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
-                
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id),
-                                                                               
  tg_id);
-            } else {
-                LOG(WARNING) << "Query/load id: " << 
print_id(query_ctx->query_id())
-                             << "can't find its workload group " << tg_id;
+                _set_scan_concurrency(params, query_ctx.get());
+
+                if (params.__isset.workload_groups && 
!params.workload_groups.empty()) {
+                    uint64_t tg_id = params.workload_groups[0].id;
+                    WorkloadGroupPtr workload_group_ptr =
+                            
_exec_env->workload_group_mgr()->get_task_group_by_id(tg_id);
+                    if (workload_group_ptr != nullptr) {
+                        
RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx));
+                        
RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
+                        
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
+                                print_id(query_id), tg_id);
+                    } else {
+                        LOG(WARNING) << "Query/load id: " << 
print_id(query_ctx->query_id())
+                                     << "can't find its workload group " << 
tg_id;
+                    }
+                }
+                // There is some logic in query ctx's dctor, we could not 
check if exists and delete the
+                // temp query ctx now. For example, the query id maybe removed 
from workload group's queryset.
+                _query_ctx_map.insert({query_id, query_ctx});
             }
         }
-        // There is some logic in query ctx's dctor, we could not check if 
exists and delete the
-        // temp query ctx now. For example, the query id maybe removed from 
workload group's queryset.
-        _query_ctx_map.insert(std::make_pair(query_ctx->query_id(), 
query_ctx));
     }
     return Status::OK();
 }
@@ -762,13 +755,13 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t 
duration) {
     fmt::memory_buffer debug_string_buffer;
     size_t i = 0;
     {
-        std::lock_guard<std::mutex> lock(_lock);
         fmt::format_to(debug_string_buffer,
                        "{} pipeline fragment contexts are still running! 
duration_limit={}\n",
                        _pipeline_map.size(), duration);
-
         timespec now;
         clock_gettime(CLOCK_MONOTONIC, &now);
+
+        std::shared_lock lock(_pipeline_map_mutex);
         for (auto& it : _pipeline_map) {
             auto elapsed = it.second->elapsed_time() / 1000000000.0;
             if (elapsed < duration) {
@@ -787,7 +780,7 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t 
duration) {
 }
 
 std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) {
-    if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
+    if (auto q_ctx = get_query_ctx(query_id)) {
         return q_ctx->print_all_pipeline_context();
     } else {
         return fmt::format(
@@ -806,7 +799,8 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
              << 
apache::thrift::ThriftDebugString(params.query_options).c_str();
 
     std::shared_ptr<QueryContext> query_ctx;
-    RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, 
query_source, query_ctx));
+    RETURN_IF_ERROR(
+            _get_or_create_query_ctx(params, params.query_id, true, 
query_source, query_ctx));
     SCOPED_ATTACH_TASK(query_ctx.get());
     int64_t duration_ns = 0;
     std::shared_ptr<pipeline::PipelineFragmentContext> context =
@@ -839,16 +833,8 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
     }
 
     {
-        // (query_id, fragment_id) is executed only on one BE, locks 
_pipeline_map.
-        std::lock_guard<std::mutex> lock(_lock);
         for (const auto& local_param : params.local_params) {
             const TUniqueId& fragment_instance_id = 
local_param.fragment_instance_id;
-            auto iter = _pipeline_map.find({params.query_id, 
params.fragment_id});
-            if (iter != _pipeline_map.end()) {
-                return Status::InternalError(
-                        "exec_plan_fragment query_id({}) input duplicated 
fragment_id({})",
-                        print_id(params.query_id), params.fragment_id);
-            }
             query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
         }
 
@@ -857,7 +843,15 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
                             .count();
         g_fragment_executing_count << 1;
         g_fragment_last_active_time.set_value(now);
-        // TODO: simplify this mapping
+
+        // (query_id, fragment_id) is executed only on one BE, locks 
_pipeline_map.
+        std::unique_lock lock(_pipeline_map_mutex);
+        auto iter = _pipeline_map.find({params.query_id, params.fragment_id});
+        if (iter != _pipeline_map.end()) {
+            return Status::InternalError(
+                    "exec_plan_fragment query_id({}) input duplicated 
fragment_id({})",
+                    print_id(params.query_id), params.fragment_id);
+        }
         _pipeline_map.insert({{params.query_id, params.fragment_id}, context});
     }
 
@@ -887,8 +881,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, 
const Status reason) {
     std::shared_ptr<QueryContext> query_ctx = nullptr;
     std::vector<TUniqueId> all_instance_ids;
     {
-        std::lock_guard<std::mutex> state_lock(_lock);
-        if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
+        if (auto q_ctx = get_query_ctx(query_id)) {
             query_ctx = q_ctx;
             // Copy instanceids to avoid concurrent modification.
             // And to reduce the scope of lock.
@@ -901,7 +894,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, 
const Status reason) {
     }
     query_ctx->cancel(reason);
     {
-        std::lock_guard<std::mutex> state_lock(_lock);
+        std::unique_lock l(_query_ctx_map_mutex);
         _query_ctx_map.erase(query_id);
     }
     LOG(INFO) << "Query " << print_id(query_id)
@@ -937,7 +930,7 @@ void FragmentMgr::cancel_worker() {
 
         std::vector<std::shared_ptr<pipeline::PipelineFragmentContext>> ctx;
         {
-            std::lock_guard<std::mutex> lock(_lock);
+            std::shared_lock lock(_pipeline_map_mutex);
             ctx.reserve(_pipeline_map.size());
             for (auto& pipeline_itr : _pipeline_map) {
                 ctx.push_back(pipeline_itr.second);
@@ -948,21 +941,24 @@ void FragmentMgr::cancel_worker() {
         }
 
         {
-            std::lock_guard<std::mutex> lock(_lock);
-            for (auto it = _query_ctx_map.begin(); it != 
_query_ctx_map.end();) {
-                if (auto q_ctx = it->second.lock()) {
-                    if (q_ctx->is_timeout(now)) {
-                        LOG_WARNING("Query {} is timeout", 
print_id(it->first));
-                        queries_timeout.push_back(it->first);
+            {
+                // TODO: Now only the cancel worker do the GC the 
_query_ctx_map. each query must
+                // do erase the finish query unless in _query_ctx_map. Rethink 
the logic is ok
+                std::unique_lock lock(_query_ctx_map_mutex);
+                for (auto it = _query_ctx_map.begin(); it != 
_query_ctx_map.end();) {
+                    if (auto q_ctx = it->second.lock()) {
+                        if (q_ctx->is_timeout(now)) {
+                            LOG_WARNING("Query {} is timeout", 
print_id(it->first));
+                            queries_timeout.push_back(it->first);
+                        }
                         ++it;
                     } else {
-                        ++it;
+                        it = _query_ctx_map.erase(it);
                     }
-                } else {
-                    it = _query_ctx_map.erase(it);
                 }
             }
 
+            std::shared_lock lock(_query_ctx_map_mutex);
             // We use a very conservative cancel strategy.
             // 0. If there are no running frontends, do not cancel any queries.
             // 1. If query's process uuid is zero, do not cancel
@@ -1186,7 +1182,7 @@ Status FragmentMgr::apply_filterv2(const 
PPublishFilterRequestV2* request,
 
     const auto& fragment_ids = request->fragment_ids();
     {
-        std::unique_lock<std::mutex> lock(_lock);
+        std::shared_lock lock(_pipeline_map_mutex);
         for (auto fragment_id : fragment_ids) {
             if (is_pipeline) {
                 auto iter = _pipeline_map.find(
@@ -1242,8 +1238,7 @@ Status FragmentMgr::send_filter_size(const 
PSendFilterSizeRequest* request) {
         TUniqueId query_id;
         query_id.__set_hi(queryid.hi);
         query_id.__set_lo(queryid.lo);
-        std::lock_guard<std::mutex> lock(_lock);
-        if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
+        if (auto q_ctx = get_query_ctx(query_id)) {
             query_ctx = q_ctx;
         } else {
             return Status::EndOfFile(
@@ -1266,8 +1261,7 @@ Status FragmentMgr::sync_filter_size(const 
PSyncFilterSizeRequest* request) {
         TUniqueId query_id;
         query_id.__set_hi(queryid.hi);
         query_id.__set_lo(queryid.lo);
-        std::lock_guard<std::mutex> lock(_lock);
-        if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
+        if (auto q_ctx = get_query_ctx(query_id)) {
             query_ctx = q_ctx;
         } else {
             return Status::EndOfFile(
@@ -1287,8 +1281,7 @@ Status FragmentMgr::merge_filter(const 
PMergeFilterRequest* request,
         TUniqueId query_id;
         query_id.__set_hi(queryid.hi);
         query_id.__set_lo(queryid.lo);
-        std::lock_guard<std::mutex> lock(_lock);
-        if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
+        if (auto q_ctx = get_query_ctx(query_id)) {
             query_ctx = q_ctx;
         } else {
             return Status::EndOfFile(
@@ -1305,7 +1298,7 @@ Status FragmentMgr::merge_filter(const 
PMergeFilterRequest* request,
 
 void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* 
query_info_list) {
     {
-        std::lock_guard<std::mutex> lock(_lock);
+        std::unique_lock lock(_query_ctx_map_mutex);
         for (auto iter = _query_ctx_map.begin(); iter != 
_query_ctx_map.end();) {
             if (auto q_ctx = iter->second.lock()) {
                 WorkloadQueryInfo workload_query_info;
@@ -1328,19 +1321,9 @@ Status FragmentMgr::get_realtime_exec_status(const 
TUniqueId& query_id,
         return Status::InvalidArgument("exes_status is nullptr");
     }
 
-    std::shared_ptr<QueryContext> query_context = nullptr;
-
-    {
-        std::lock_guard<std::mutex> lock(_lock);
-        if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
-            query_context = q_ctx;
-        } else {
-            return Status::NotFound("Query {} has been released", 
print_id(query_id));
-        }
-    }
-
+    std::shared_ptr<QueryContext> query_context = get_query_ctx(query_id);
     if (query_context == nullptr) {
-        return Status::NotFound("Query {} not found", print_id(query_id));
+        return Status::NotFound("Query {} not found or released", 
print_id(query_id));
     }
 
     *exec_status = query_context->get_realtime_exec_status();
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 20b2fd8cdc2..41ea67844e0 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -133,7 +133,7 @@ public:
     ThreadPool* get_thread_pool() { return _thread_pool.get(); }
 
     int32_t running_query_num() {
-        std::unique_lock<std::mutex> ctx_lock(_lock);
+        std::shared_lock lock(_query_ctx_map_mutex);
         return _query_ctx_map.size();
     }
 
@@ -145,35 +145,41 @@ public:
     Status get_realtime_exec_status(const TUniqueId& query_id,
                                     TReportExecStatusParams* exec_status);
 
-    std::shared_ptr<QueryContext> get_or_erase_query_ctx_with_lock(const 
TUniqueId& query_id);
+    std::shared_ptr<QueryContext> get_query_ctx(const TUniqueId& query_id);
 
 private:
     std::shared_ptr<QueryContext> _get_or_erase_query_ctx(const TUniqueId& 
query_id);
 
+    struct BrpcItem {
+        TNetworkAddress network_address;
+        std::vector<std::weak_ptr<QueryContext>> queries;
+    };
+
     template <typename Param>
     void _set_scan_concurrency(const Param& params, QueryContext* query_ctx);
 
-    template <typename Params>
-    Status _get_query_ctx(const Params& params, TUniqueId query_id, bool 
pipeline,
-                          QuerySource query_type, 
std::shared_ptr<QueryContext>& query_ctx);
+    Status _get_or_create_query_ctx(const TPipelineFragmentParams& params, 
TUniqueId query_id,
+                                    bool pipeline, QuerySource query_type,
+                                    std::shared_ptr<QueryContext>& query_ctx);
 
     // This is input params
     ExecEnv* _exec_env = nullptr;
 
+    // The lock protect the `_pipeline_map`
+    std::shared_mutex _pipeline_map_mutex;
+    // (QueryID, FragmentID) -> PipelineFragmentContext
+    phmap::flat_hash_map<std::pair<TUniqueId, int>,
+                         std::shared_ptr<pipeline::PipelineFragmentContext>>
+            _pipeline_map;
+
     // The lock should only be used to protect the structures in fragment 
manager. Has to be
     // used in a very small scope because it may dead lock. For example, if 
the _lock is used
     // in prepare stage, the call path is  prepare --> expr prepare --> may 
call allocator
     // when allocate failed, allocator may call query_is_cancelled, query is 
callced will also
     // call _lock, so that there is dead lock.
-    std::mutex _lock;
-
-    // (QueryID, FragmentID) -> PipelineFragmentContext
-    std::unordered_map<std::pair<TUniqueId, int>,
-                       std::shared_ptr<pipeline::PipelineFragmentContext>>
-            _pipeline_map;
-
+    std::shared_mutex _query_ctx_map_mutex;
     // query id -> QueryContext
-    std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctx_map;
+    phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>> 
_query_ctx_map;
     std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>> 
_bf_size_map;
 
     CountDownLatch _stop_background_threads_latch;
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 9369c0c833c..4ff83ff93df 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -45,8 +45,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t 
timeout_s, bool is_hig
           _backend_id(backend_id),
           _enable_profile(enable_profile) {
     std::shared_ptr<QueryContext> query_context =
-            
ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx_with_lock(
-                    _load_id.to_thrift());
+            
ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(_load_id.to_thrift());
     std::shared_ptr<MemTrackerLimiter> mem_tracker = nullptr;
     WorkloadGroupPtr wg_ptr = nullptr;
 
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 752e2ff95b2..60da45fa685 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -428,7 +428,7 @@ LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr* 
load_stream_mgr, bool e
     TUniqueId load_tid = ((UniqueId)load_id).to_thrift();
 #ifndef BE_TEST
     std::shared_ptr<QueryContext> query_context =
-            
ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx_with_lock(load_tid);
+            ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(load_tid);
     if (query_context != nullptr) {
         _query_thread_context = {load_tid, query_context->query_mem_tracker,
                                  query_context->workload_group()};
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 4f24824ac70..34d9be9bcb2 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -299,7 +299,7 @@ Status RuntimeState::init(const TUniqueId& 
fragment_instance_id, const TQueryOpt
 }
 
 std::weak_ptr<QueryContext> RuntimeState::get_query_ctx_weak() {
-    return 
_exec_env->fragment_mgr()->get_or_erase_query_ctx_with_lock(_query_ctx->query_id());
+    return _exec_env->fragment_mgr()->get_query_ctx(_query_ctx->query_id());
 }
 
 void RuntimeState::init_mem_trackers(const std::string& name, const TUniqueId& 
id) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to