This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new df6935ad044 [Improvement](fragment) Improvement map performance in 
fragment mgr (#46245)
df6935ad044 is described below

commit df6935ad04497408463986a1826c92f818fbf405
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Thu Jan 2 15:11:56 2025 +0800

    [Improvement](fragment) Improvement map performance in fragment mgr (#46245)
    
    pick #46235
---
 be/src/common/config.cpp        |   1 +
 be/src/common/config.h          |   2 +-
 be/src/runtime/fragment_mgr.cpp | 713 +++++++++++++++++++++-------------------
 be/src/runtime/fragment_mgr.h   |  65 +++-
 be/src/runtime/query_context.h  |   6 +
 5 files changed, 434 insertions(+), 353 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 483e7753ec2..f047071139e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -219,6 +219,7 @@ DEFINE_Int32(check_consistency_worker_count, "1");
 DEFINE_Int32(upload_worker_count, "1");
 // the count of thread to download
 DEFINE_Int32(download_worker_count, "1");
+DEFINE_Int32(num_query_ctx_map_partitions, "128");
 // the count of thread to make snapshot
 DEFINE_Int32(make_snapshot_worker_count, "5");
 // the count of thread to release snapshot
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 3579ce54fce..29080a56def 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1346,7 +1346,7 @@ DECLARE_Int32(spill_io_thread_pool_thread_num);
 DECLARE_Int32(spill_io_thread_pool_queue_size);
 
 DECLARE_mBool(check_segment_when_build_rowset_meta);
-
+DECLARE_Int32(num_query_ctx_map_partitions);
 // max s3 client retry times
 DECLARE_mInt32(max_s3_client_retry);
 // When meet s3 429 error, the "get" request will
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index b7bbaf8f206..0788b5e3206 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -222,12 +222,95 @@ static std::map<int64_t, std::unordered_set<TUniqueId>> 
_get_all_running_queries
     return result;
 }
 
+inline uint32_t get_map_id(const TUniqueId& query_id, size_t capacity) {
+    uint32_t value = HashUtil::hash(&query_id.lo, 8, 0);
+    value = HashUtil::hash(&query_id.hi, 8, value);
+    return value % capacity;
+}
+
+inline uint32_t get_map_id(std::pair<TUniqueId, int> key, size_t capacity) {
+    uint32_t value = HashUtil::hash(&key.first.lo, 8, 0);
+    value = HashUtil::hash(&key.first.hi, 8, value);
+    return value % capacity;
+}
+
+template <typename Key, typename Value, typename ValueType>
+ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
+    _internal_map.resize(config::num_query_ctx_map_partitions);
+    for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
+        _internal_map[i] = {std::make_unique<std::shared_mutex>(),
+                            phmap::flat_hash_map<Key, Value>()};
+    }
+}
+
+template <typename Key, typename Value, typename ValueType>
+Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) {
+    auto id = get_map_id(query_id, _internal_map.size());
+    {
+        std::shared_lock lock(*_internal_map[id].first);
+        auto& map = _internal_map[id].second;
+        auto search = map.find(query_id);
+        if (search != map.end()) {
+            return search->second;
+        }
+        return std::shared_ptr<ValueType>(nullptr);
+    }
+}
+
+template <typename Key, typename Value, typename ValueType>
+Status ConcurrentContextMap<Key, Value, ValueType>::apply_if_not_exists(
+        const Key& query_id, std::shared_ptr<ValueType>& query_ctx, 
ApplyFunction&& function) {
+    auto id = get_map_id(query_id, _internal_map.size());
+    {
+        std::unique_lock lock(*_internal_map[id].first);
+        auto& map = _internal_map[id].second;
+        auto search = map.find(query_id);
+        if (search != map.end()) {
+            query_ctx = search->second;
+        }
+        if (!query_ctx) {
+            return function(map);
+        }
+        return Status::OK();
+    }
+}
+
+template <typename Key, typename Value, typename ValueType>
+void ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
+    auto id = get_map_id(query_id, _internal_map.size());
+    {
+        std::unique_lock lock(*_internal_map[id].first);
+        auto& map = _internal_map[id].second;
+        map.erase(query_id);
+    }
+}
+
+template <typename Key, typename Value, typename ValueType>
+void ConcurrentContextMap<Key, Value, ValueType>::insert(const Key& query_id,
+                                                         
std::shared_ptr<ValueType> query_ctx) {
+    auto id = get_map_id(query_id, _internal_map.size());
+    {
+        std::unique_lock lock(*_internal_map[id].first);
+        auto& map = _internal_map[id].second;
+        map.insert({query_id, query_ctx});
+    }
+}
+
+template <typename Key, typename Value, typename ValueType>
+void ConcurrentContextMap<Key, Value, ValueType>::clear() {
+    for (auto& pair : _internal_map) {
+        std::unique_lock lock(*pair.first);
+        auto& map = pair.second;
+        map.clear();
+    }
+}
+
 FragmentMgr::FragmentMgr(ExecEnv* exec_env)
         : _exec_env(exec_env), _stop_background_threads_latch(1) {
     _entity = 
DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
     INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
     REGISTER_HOOK_METRIC(fragment_instance_count,
-                         [this]() { return _fragment_instance_map.size(); });
+                         [this]() { return _fragment_instance_map.num_items(); 
});
 
     auto s = Thread::create(
             "FragmentMgr", "cancel_timeout_plan_fragment", [this]() { 
this->cancel_worker(); },
@@ -268,20 +351,17 @@ void FragmentMgr::stop() {
     _thread_pool->shutdown();
 
     // Only me can delete
-    {
-        std::lock_guard<std::mutex> lock(_lock);
-        _fragment_instance_map.clear();
-        for (auto& pipeline : _pipeline_map) {
-            pipeline.second->close_sink();
-        }
-        _pipeline_map.clear();
-    }
-
-    {
-        std::unique_lock lock(_query_ctx_map_lock);
-        _query_ctx_map.clear();
-    }
-
+    _fragment_instance_map.clear();
+    _pipeline_map.apply(
+            [&](phmap::flat_hash_map<TUniqueId, 
std::shared_ptr<pipeline::PipelineFragmentContext>>&
+                        map) -> Status {
+                for (auto& pipeline : map) {
+                    pipeline.second->close_sink();
+                }
+                return Status::OK();
+            });
+    _pipeline_map.clear();
+    _query_ctx_map.clear();
     _async_report_thread_pool->shutdown();
 }
 
@@ -621,13 +701,11 @@ void 
FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
 
     // remove exec state after this fragment finished
     {
-        std::lock_guard<std::mutex> lock(_lock);
         
_fragment_instance_map.erase(fragment_executor->fragment_instance_id());
 
         LOG_INFO("Instance {} finished", 
print_id(fragment_executor->fragment_instance_id()));
     }
     if (all_done && query_ctx) {
-        std::unique_lock lock(_query_ctx_map_lock);
         _query_ctx_map.erase(query_ctx->query_id());
         LOG_INFO("Query {} finished", print_id(query_ctx->query_id()));
     }
@@ -721,15 +799,13 @@ Status FragmentMgr::start_query_execution(const 
PExecPlanFragmentStartRequest* r
         TUniqueId query_id;
         query_id.__set_hi(request->query_id().hi());
         query_id.__set_lo(request->query_id().lo());
-        std::shared_lock lock(_query_ctx_map_lock);
-        auto search = _query_ctx_map.find(query_id);
-        if (search == _query_ctx_map.end()) {
+        q_ctx = _query_ctx_map.find(query_id);
+        if (q_ctx == nullptr) {
             return Status::InternalError(
                     "Failed to get query fragments context. Query may be "
                     "timeout or be cancelled. host: {}",
                     BackendOptions::get_localhost());
         }
-        q_ctx = search->second;
     }
     q_ctx->set_ready_to_execute(false);
     LOG_INFO("Query {} start execution", print_id(query_id));
@@ -742,7 +818,6 @@ void FragmentMgr::remove_pipeline_context(
     bool all_done = false;
     TUniqueId query_id = f_context->get_query_id();
     {
-        std::lock_guard<std::mutex> lock(_lock);
         std::vector<TUniqueId> ins_ids;
         f_context->instance_ids(ins_ids);
         all_done = q_context->countdown(ins_ids.size());
@@ -754,7 +829,6 @@ void FragmentMgr::remove_pipeline_context(
         }
     }
     if (all_done) {
-        std::unique_lock lock(_query_ctx_map_lock);
         _query_ctx_map.erase(query_id);
         LOG_INFO("Query {} finished", print_id(query_id));
     }
@@ -768,98 +842,90 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
                     { return 
Status::InternalError("FragmentMgr._get_query_ctx.failed"); });
     if (params.is_simplified_param) {
         // Get common components from _query_ctx_map
-        std::shared_lock lock(_query_ctx_map_lock);
-        auto search = _query_ctx_map.find(query_id);
-        if (search == _query_ctx_map.end()) {
+        query_ctx = _query_ctx_map.find(query_id);
+        if (query_ctx == nullptr) {
             return Status::InternalError(
                     "Failed to get query fragments context. Query may be "
                     "timeout or be cancelled. host: {}",
                     BackendOptions::get_localhost());
         }
-        query_ctx = search->second;
     } else {
         // Find _query_ctx_map, in case some other request has already
         // create the query fragments context.
-        {
-            std::shared_lock lock(_query_ctx_map_lock);
-            auto search = _query_ctx_map.find(query_id);
-            if (search != _query_ctx_map.end()) {
-                query_ctx = search->second;
-                return Status::OK();
-            }
-        }
-
-        std::unique_lock lock(_query_ctx_map_lock);
-        auto search = _query_ctx_map.find(query_id);
-        if (search != _query_ctx_map.end()) {
-            query_ctx = search->second;
-            return Status::OK();
-        }
-
-        TNetworkAddress current_connect_fe_addr;
-        // for gray upragde between 2.1 version, fe may not set 
current_connect_fe,
-        // then use coord addr instead
-        if (params.__isset.current_connect_fe) {
-            current_connect_fe_addr = params.current_connect_fe;
-        } else {
-            current_connect_fe_addr = params.coord;
-        }
+        RETURN_IF_ERROR(_query_ctx_map.apply_if_not_exists(
+                query_id, query_ctx,
+                [&](phmap::flat_hash_map<TUniqueId, 
std::shared_ptr<QueryContext>>& map) -> Status {
+                    TNetworkAddress current_connect_fe_addr;
+                    // for gray upragde between 2.1 version, fe may not set 
current_connect_fe,
+                    // then use coord addr instead
+                    if (params.__isset.current_connect_fe) {
+                        current_connect_fe_addr = params.current_connect_fe;
+                    } else {
+                        current_connect_fe_addr = params.coord;
+                    }
 
-        LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " << 
params.coord
-                  << ", total fragment num on current host: " << 
params.fragment_num_on_host
-                  << ", fe process uuid: " << 
params.query_options.fe_process_uuid
-                  << ", query type: " << params.query_options.query_type
-                  << ", report audit fe:" << current_connect_fe_addr;
-
-        // This may be a first fragment request of the query.
-        // Create the query fragments context.
-        query_ctx = QueryContext::create_shared(
-                query_id, params.fragment_num_on_host, _exec_env, 
params.query_options,
-                params.coord, pipeline, params.is_nereids, 
current_connect_fe_addr, query_source);
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
-        RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), 
params.desc_tbl,
-                                              &(query_ctx->desc_tbl)));
-        // set file scan range params
-        if (params.__isset.file_scan_params) {
-            query_ctx->file_scan_range_params_map = params.file_scan_params;
-        }
+                    LOG(INFO) << "query_id: " << print_id(query_id)
+                              << ", coord_addr: " << params.coord
+                              << ", total fragment num on current host: "
+                              << params.fragment_num_on_host
+                              << ", fe process uuid: " << 
params.query_options.fe_process_uuid
+                              << ", query type: " << 
params.query_options.query_type
+                              << ", report audit fe:" << 
current_connect_fe_addr;
+
+                    // This may be a first fragment request of the query.
+                    // Create the query fragments context.
+                    query_ctx = QueryContext::create_shared(
+                            query_id, params.fragment_num_on_host, _exec_env, 
params.query_options,
+                            params.coord, pipeline, params.is_nereids, 
current_connect_fe_addr,
+                            query_source);
+                    
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
+                    
RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl,
+                                                          
&(query_ctx->desc_tbl)));
+                    // set file scan range params
+                    if (params.__isset.file_scan_params) {
+                        query_ctx->file_scan_range_params_map = 
params.file_scan_params;
+                    }
 
-        query_ctx->query_globals = params.query_globals;
+                    query_ctx->query_globals = params.query_globals;
 
-        if (params.__isset.resource_info) {
-            query_ctx->user = params.resource_info.user;
-            query_ctx->group = params.resource_info.group;
-            query_ctx->set_rsc_info = true;
-        }
+                    if (params.__isset.resource_info) {
+                        query_ctx->user = params.resource_info.user;
+                        query_ctx->group = params.resource_info.group;
+                        query_ctx->set_rsc_info = true;
+                    }
 
-        
query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(pipeline);
-        _set_scan_concurrency(params, query_ctx.get());
-        const bool is_pipeline = std::is_same_v<TPipelineFragmentParams, 
Params>;
-
-        if (params.__isset.workload_groups && !params.workload_groups.empty()) 
{
-            uint64_t tg_id = params.workload_groups[0].id;
-            WorkloadGroupPtr workload_group_ptr =
-                    
_exec_env->workload_group_mgr()->get_task_group_by_id(tg_id);
-            if (workload_group_ptr != nullptr) {
-                RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, 
query_ctx));
-                
RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
-                
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id),
-                                                                               
  tg_id);
-
-                LOG(INFO) << "Query/load id: " << 
print_id(query_ctx->query_id())
-                          << ", use workload group: " << 
workload_group_ptr->debug_string()
-                          << ", is pipeline: " << ((int)is_pipeline);
-            } else {
-                LOG(INFO) << "Query/load id: " << 
print_id(query_ctx->query_id())
-                          << " carried group info but can not find group in 
be";
-            }
-        }
-        // There is some logic in query ctx's dctor, we could not check if 
exists and delete the
-        // temp query ctx now. For example, the query id maybe removed from 
workload group's queryset.
-        _query_ctx_map.insert(std::make_pair(query_ctx->query_id(), 
query_ctx));
-        LOG(INFO) << "Register query/load memory tracker, query/load id: "
-                  << print_id(query_ctx->query_id())
-                  << " limit: " << 
PrettyPrinter::print(query_ctx->mem_limit(), TUnit::BYTES);
+                    
query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(
+                            pipeline);
+                    _set_scan_concurrency(params, query_ctx.get());
+                    const bool is_pipeline = 
std::is_same_v<TPipelineFragmentParams, Params>;
+
+                    if (params.__isset.workload_groups && 
!params.workload_groups.empty()) {
+                        uint64_t tg_id = params.workload_groups[0].id;
+                        WorkloadGroupPtr workload_group_ptr =
+                                
_exec_env->workload_group_mgr()->get_task_group_by_id(tg_id);
+                        if (workload_group_ptr != nullptr) {
+                            
RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx));
+                            
RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
+                            
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
+                                    print_id(query_id), tg_id);
+
+                            LOG(INFO) << "Query/load id: " << 
print_id(query_ctx->query_id())
+                                      << ", use workload group: "
+                                      << workload_group_ptr->debug_string()
+                                      << ", is pipeline: " << 
((int)is_pipeline);
+                        } else {
+                            LOG(INFO) << "Query/load id: " << 
print_id(query_ctx->query_id())
+                                      << " carried group info but can not find 
group in be";
+                        }
+                    }
+                    // There is some logic in query ctx's dctor, we could not 
check if exists and delete the
+                    // temp query ctx now. For example, the query id maybe 
removed from workload group's queryset.
+                    map.insert({query_id, query_ctx});
+                    LOG(INFO) << "Register query/load memory tracker, 
query/load id: "
+                              << print_id(query_ctx->query_id()) << " limit: "
+                              << PrettyPrinter::print(query_ctx->mem_limit(), 
TUnit::BYTES);
+                    return Status::OK();
+                }));
     }
     return Status::OK();
 }
@@ -874,9 +940,8 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params,
              << 
apache::thrift::ThriftDebugString(params.query_options).c_str();
     const TUniqueId& fragment_instance_id = params.params.fragment_instance_id;
     {
-        std::lock_guard<std::mutex> lock(_lock);
         auto iter = _fragment_instance_map.find(fragment_instance_id);
-        if (iter != _fragment_instance_map.end()) {
+        if (iter != nullptr) {
             // Duplicated
             LOG(WARNING) << "duplicate fragment instance id: " << 
print_id(fragment_instance_id);
             return Status::OK();
@@ -894,8 +959,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params,
         // Need lock here, because it will modify fragment ids and std::vector 
may resize and reallocate
         // memory, but query_is_canncelled will traverse the vector, it will 
core.
         // query_is_cancelled is called in allocator, we has to avoid dead 
lock.
-        std::lock_guard<std::mutex> lock(_lock);
-        query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
+        query_ctx->push_instance_ids(fragment_instance_id);
     }
 
     auto fragment_executor = std::make_shared<PlanFragmentExecutor>(
@@ -921,12 +985,10 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params,
             params.params, params.params.query_id, params.query_options, 
&handler,
             
RuntimeFilterParamsContext::create(fragment_executor->runtime_state())));
     {
-        std::lock_guard<std::mutex> lock(_lock);
         if (handler) {
             query_ctx->set_merge_controller_handler(handler);
         }
-        _fragment_instance_map.insert(
-                std::make_pair(params.params.fragment_instance_id, 
fragment_executor));
+        _fragment_instance_map.insert(params.params.fragment_instance_id, 
fragment_executor);
     }
 
     auto st = _thread_pool->submit_func([this, fragment_executor, cb]() {
@@ -938,7 +1000,6 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params,
     if (!st.ok()) {
         {
             // Remove the exec state added
-            std::lock_guard<std::mutex> lock(_lock);
             _fragment_instance_map.erase(params.params.fragment_instance_id);
         }
         fragment_executor->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
@@ -957,27 +1018,33 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t 
duration) {
     auto t = MonotonicNanos();
     size_t i = 0;
     {
-        std::lock_guard<std::mutex> lock(_lock);
         fmt::format_to(debug_string_buffer,
                        "{} pipeline fragment contexts are still running! 
duration_limit={}\n",
-                       _pipeline_map.size(), duration);
+                       _pipeline_map.num_items(), duration);
 
         timespec now;
         clock_gettime(CLOCK_MONOTONIC, &now);
-        for (auto& it : _pipeline_map) {
-            auto elapsed = (t - it.second->create_time()) / 1000000000.0;
-            if (elapsed < duration) {
-                // Only display tasks which has been running for more than 
{duration} seconds.
-                continue;
-            }
-            auto timeout_second = it.second->timeout_second();
-            fmt::format_to(debug_string_buffer,
-                           "No.{} (elapse_second={}s, 
query_timeout_second={}s, instance_id="
-                           "{}) : {}\n",
-                           i, elapsed, timeout_second, print_id(it.first),
-                           it.second->debug_string());
-            i++;
-        }
+        _pipeline_map.apply(
+                [&](phmap::flat_hash_map<TUniqueId,
+                                         
std::shared_ptr<pipeline::PipelineFragmentContext>>& map)
+                        -> Status {
+                    for (auto& it : map) {
+                        auto elapsed = (t - it.second->create_time()) / 
1000000000.0;
+                        if (elapsed < duration) {
+                            // Only display tasks which has been running for 
more than {duration} seconds.
+                            continue;
+                        }
+                        auto timeout_second = it.second->timeout_second();
+                        fmt::format_to(
+                                debug_string_buffer,
+                                "No.{} (elapse_second={}s, 
query_timeout_second={}s, instance_id="
+                                "{}) : {}\n",
+                                i, elapsed, timeout_second, print_id(it.first),
+                                it.second->debug_string());
+                        i++;
+                    }
+                    return Status::OK();
+                });
     }
     return fmt::to_string(debug_string_buffer);
 }
@@ -1037,14 +1104,13 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
 
         for (const auto& local_param : params.local_params) {
             const TUniqueId& fragment_instance_id = 
local_param.fragment_instance_id;
-            std::lock_guard<std::mutex> lock(_lock);
             auto iter = _pipeline_map.find(fragment_instance_id);
-            if (iter != _pipeline_map.end()) {
+            if (iter != nullptr) {
                 return Status::InternalError(
                         "exec_plan_fragment input duplicated 
fragment_instance_id({})",
                         UniqueId(fragment_instance_id).to_string());
             }
-            query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
+            query_ctx->push_instance_ids(fragment_instance_id);
         }
 
         if (!params.__isset.need_wait_execution_trigger || 
!params.need_wait_execution_trigger) {
@@ -1052,13 +1118,12 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
         }
 
         {
-            std::lock_guard<std::mutex> lock(_lock);
             std::vector<TUniqueId> ins_ids;
             
reinterpret_cast<pipeline::PipelineXFragmentContext*>(context.get())
                     ->instance_ids(ins_ids);
             // TODO: simplify this mapping
             for (const auto& ins_id : ins_ids) {
-                _pipeline_map.insert({ins_id, context});
+                _pipeline_map.insert(ins_id, context);
             }
         }
         query_ctx->set_pipeline_context(params.fragment_id, context);
@@ -1071,13 +1136,12 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
 
             const TUniqueId& fragment_instance_id = 
local_params.fragment_instance_id;
             {
-                std::lock_guard<std::mutex> lock(_lock);
-                auto iter = _pipeline_map.find(fragment_instance_id);
-                if (iter != _pipeline_map.end()) {
+                auto res = _pipeline_map.find(fragment_instance_id);
+                if (res != nullptr) {
                     // Duplicated
                     return Status::OK();
                 }
-                
query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
+                query_ctx->push_instance_ids(fragment_instance_id);
             }
 
             int64_t duration_ns = 0;
@@ -1115,10 +1179,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
             if (i == 0 && handler) {
                 query_ctx->set_merge_controller_handler(handler);
             }
-            {
-                std::lock_guard<std::mutex> lock(_lock);
-                _pipeline_map.insert(std::make_pair(fragment_instance_id, 
context));
-            }
+            _pipeline_map.insert(fragment_instance_id, context);
 
             return context->submit();
         };
@@ -1188,13 +1249,7 @@ void FragmentMgr::_set_scan_concurrency(const Param& 
params, QueryContext* query
 }
 
 std::shared_ptr<QueryContext> FragmentMgr::get_query_context(const TUniqueId& 
query_id) {
-    std::shared_lock lock(_query_ctx_map_lock);
-    auto ctx = _query_ctx_map.find(query_id);
-    if (ctx != _query_ctx_map.end()) {
-        return ctx->second;
-    } else {
-        return nullptr;
-    }
+    return _query_ctx_map.find(query_id);
 }
 
 void FragmentMgr::cancel_query(const TUniqueId& query_id, const 
PPlanFragmentCancelReason& reason,
@@ -1202,15 +1257,13 @@ void FragmentMgr::cancel_query(const TUniqueId& 
query_id, const PPlanFragmentCan
     std::shared_ptr<QueryContext> query_ctx;
     std::vector<TUniqueId> all_instance_ids;
     {
-        std::shared_lock lock(_query_ctx_map_lock);
-        auto ctx_iter = _query_ctx_map.find(query_id);
+        query_ctx = _query_ctx_map.find(query_id);
 
-        if (ctx_iter == _query_ctx_map.end()) {
+        if (query_ctx == nullptr) {
             LOG(WARNING) << "Query " << print_id(query_id)
                          << " does not exists, failed to cancel it";
             return;
         }
-        query_ctx = ctx_iter->second;
         // Copy instanceids to avoid concurrent modification.
         // And to reduce the scope of lock.
         all_instance_ids = query_ctx->fragment_instance_ids;
@@ -1224,10 +1277,7 @@ void FragmentMgr::cancel_query(const TUniqueId& 
query_id, const PPlanFragmentCan
     }
 
     query_ctx->cancel(msg, Status::Cancelled(msg));
-    {
-        std::lock_guard<std::mutex> state_lock(_lock);
-        _query_ctx_map.erase(query_id);
-    }
+    _query_ctx_map.erase(query_id);
     LOG(INFO) << "Query " << print_id(query_id) << " is cancelled and removed. 
Reason: " << msg;
 }
 
@@ -1236,22 +1286,10 @@ void FragmentMgr::cancel_instance(const TUniqueId& 
instance_id,
     std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_ctx;
     std::shared_ptr<PlanFragmentExecutor> non_pipeline_ctx;
     {
-        std::lock_guard<std::mutex> state_lock(_lock);
-        const bool is_pipeline_instance = _pipeline_map.contains(instance_id);
-        if (is_pipeline_instance) {
-            auto itr = _pipeline_map.find(instance_id);
-            if (itr != _pipeline_map.end()) {
-                pipeline_ctx = itr->second;
-            } else {
-                LOG(WARNING) << "Could not find the pipeline instance id:" << 
print_id(instance_id)
-                             << " to cancel";
-                return;
-            }
-        } else {
-            auto itr = _fragment_instance_map.find(instance_id);
-            if (itr != _fragment_instance_map.end()) {
-                non_pipeline_ctx = itr->second;
-            } else {
+        pipeline_ctx = _pipeline_map.find(instance_id);
+        if (!pipeline_ctx) {
+            non_pipeline_ctx = _fragment_instance_map.find(instance_id);
+            if (non_pipeline_ctx == nullptr) {
                 LOG(WARNING) << "Could not find the fragment instance id:" << 
print_id(instance_id)
                              << " to cancel";
                 return;
@@ -1269,14 +1307,10 @@ void FragmentMgr::cancel_instance(const TUniqueId& 
instance_id,
 
 void FragmentMgr::cancel_fragment(const TUniqueId& query_id, int32_t 
fragment_id,
                                   const PPlanFragmentCancelReason& reason, 
const std::string& msg) {
-    std::shared_lock lock(_query_ctx_map_lock);
-    auto q_ctx_iter = _query_ctx_map.find(query_id);
-    if (q_ctx_iter != _query_ctx_map.end()) {
+    auto res = _query_ctx_map.find(query_id);
+    if (res != nullptr) {
         // Has to use value to keep the shared ptr not deconstructed.
-        std::shared_ptr<QueryContext> q_ctx = q_ctx_iter->second;
-        // the lock should only be used to protect the map, not scope query ctx
-        lock.unlock();
-        WARN_IF_ERROR(q_ctx->cancel_pipeline_context(fragment_id, reason, msg),
+        WARN_IF_ERROR(res->cancel_pipeline_context(fragment_id, reason, msg),
                       "fail to cancel fragment");
     } else {
         LOG(WARNING) << "Could not find the query id:" << print_id(query_id)
@@ -1314,141 +1348,161 @@ void FragmentMgr::cancel_worker() {
         VecDateTimeValue now = VecDateTimeValue::local_time();
         std::unordered_map<std::shared_ptr<PBackendService_Stub>, BrpcItem> 
brpc_stub_with_queries;
         {
-            std::lock_guard<std::mutex> lock(_lock);
-            for (auto& fragment_instance_itr : _fragment_instance_map) {
-                if (fragment_instance_itr.second->is_timeout(now)) {
-                    
queries_timeout.push_back(fragment_instance_itr.second->fragment_instance_id());
-                }
-            }
-
-            for (auto& pipeline_itr : _pipeline_map) {
-                if (pipeline_itr.second->is_timeout(now)) {
-                    std::vector<TUniqueId> ins_ids;
-                    
reinterpret_cast<pipeline::PipelineXFragmentContext*>(pipeline_itr.second.get())
-                            ->instance_ids(ins_ids);
-                    for (auto& ins_id : ins_ids) {
-                        queries_timeout.push_back(ins_id);
-                    }
-                } else {
-                    pipeline_itr.second->clear_finished_tasks();
-                }
-            }
+            _fragment_instance_map.apply(
+                    [&](phmap::flat_hash_map<TUniqueId, 
std::shared_ptr<PlanFragmentExecutor>>& map)
+                            -> Status {
+                        for (auto& fragment_instance_itr : map) {
+                            if (fragment_instance_itr.second->is_timeout(now)) 
{
+                                queries_timeout.push_back(
+                                        
fragment_instance_itr.second->fragment_instance_id());
+                            }
+                        }
+                        return Status::OK();
+                    });
+            _pipeline_map.apply(
+                    [&](phmap::flat_hash_map<
+                            TUniqueId, 
std::shared_ptr<pipeline::PipelineFragmentContext>>& map)
+                            -> Status {
+                        for (auto& pipeline_itr : map) {
+                            if (pipeline_itr.second->is_timeout(now)) {
+                                std::vector<TUniqueId> ins_ids;
+                                
reinterpret_cast<pipeline::PipelineXFragmentContext*>(
+                                        pipeline_itr.second.get())
+                                        ->instance_ids(ins_ids);
+                                for (auto& ins_id : ins_ids) {
+                                    queries_timeout.push_back(ins_id);
+                                }
+                            } else {
+                                pipeline_itr.second->clear_finished_tasks();
+                            }
+                        }
+                        return Status::OK();
+                    });
         }
         {
-            std::unique_lock lock(_query_ctx_map_lock);
-            for (auto it = _query_ctx_map.begin(); it != 
_query_ctx_map.end();) {
-                if (it->second->is_timeout(now)) {
-                    LOG_WARNING("Query {} is timeout", print_id(it->first));
-                    it = _query_ctx_map.erase(it);
-                } else {
-                    if (config::enable_brpc_connection_check) {
-                        auto brpc_stubs = it->second->get_using_brpc_stubs();
-                        for (auto& item : brpc_stubs) {
-                            if (!brpc_stub_with_queries.contains(item.second)) 
{
-                                brpc_stub_with_queries.emplace(item.second,
-                                                               BrpcItem 
{item.first, {it->second}});
-                            } else {
-                                
brpc_stub_with_queries[item.second].queries.emplace_back(
-                                        it->second);
+            _query_ctx_map.apply([&](phmap::flat_hash_map<TUniqueId, 
std::shared_ptr<QueryContext>>&
+                                             map) -> Status {
+                for (auto it = map.begin(); it != map.end();) {
+                    if (it->second->is_timeout(now)) {
+                        LOG_WARNING("Query {} is timeout", 
print_id(it->first));
+                        it = map.erase(it);
+                    } else {
+                        if (config::enable_brpc_connection_check) {
+                            auto brpc_stubs = 
it->second->get_using_brpc_stubs();
+                            for (auto& item : brpc_stubs) {
+                                if 
(!brpc_stub_with_queries.contains(item.second)) {
+                                    brpc_stub_with_queries.emplace(
+                                            item.second, BrpcItem {item.first, 
{it->second}});
+                                } else {
+                                    
brpc_stub_with_queries[item.second].queries.emplace_back(
+                                            it->second);
+                                }
                             }
                         }
+                        ++it;
                     }
-                    ++it;
                 }
-            }
+                return Status::OK();
+            });
         }
         {
-            std::shared_lock lock(_query_ctx_map_lock);
             // We use a very conservative cancel strategy.
             // 0. If there are no running frontends, do not cancel any queries.
             // 1. If query's process uuid is zero, do not cancel
             // 2. If same process uuid, do not cancel
             // 3. If fe has zero process uuid, do not cancel
-            if (running_fes.empty() && !_query_ctx_map.empty()) {
+            if (running_fes.empty() && _query_ctx_map.num_items() != 0) {
                 LOG_EVERY_N(WARNING, 10)
                         << "Could not find any running frontends, maybe we are 
upgrading or "
                            "starting? "
                         << "We will not cancel any outdated queries in this 
situation.";
             } else {
-                for (const auto& q : _query_ctx_map) {
-                    auto q_ctx = q.second;
-                    const int64_t fe_process_uuid = 
q_ctx->get_fe_process_uuid();
-
-                    if (fe_process_uuid == 0) {
-                        // zero means this query is from a older version fe or
-                        // this fe is starting
-                        continue;
-                    }
+                _query_ctx_map.apply([&](phmap::flat_hash_map<TUniqueId,
+                                                              
std::shared_ptr<QueryContext>>& map)
+                                             -> Status {
+                    for (const auto& q : map) {
+                        auto q_ctx = q.second;
+                        const int64_t fe_process_uuid = 
q_ctx->get_fe_process_uuid();
+
+                        if (fe_process_uuid == 0) {
+                            // zero means this query is from a older version 
fe or
+                            // this fe is starting
+                            continue;
+                        }
 
-                    // If the query is not running on the any frontends, 
cancel it.
-                    if (auto itr = 
running_queries_on_all_fes.find(fe_process_uuid);
-                        itr != running_queries_on_all_fes.end()) {
-                        // Query not found on this frontend, and the query 
arrives before the last check
-                        if (itr->second.find(q_ctx->query_id()) == 
itr->second.end() &&
-                            // tv_nsec represents the number of nanoseconds 
that have elapsed since the time point stored in tv_sec.
-                            // tv_sec is enough, we do not need to check 
tv_nsec.
-                            q_ctx->get_query_arrival_timestamp().tv_sec <
-                                    check_invalid_query_last_timestamp.tv_sec 
&&
-                            q_ctx->get_query_source() == 
QuerySource::INTERNAL_FRONTEND) {
-                            if (q_ctx->enable_pipeline_x_exec()) {
-                                
queries_pipeline_task_leak.push_back(q_ctx->query_id());
-                                LOG_INFO(
-                                        "Query {}, type {} is not found on any 
frontends, maybe it "
-                                        "is leaked.",
-                                        print_id(q_ctx->query_id()),
-                                        toString(q_ctx->get_query_source()));
-                                continue;
+                        // If the query is not running on the any frontends, 
cancel it.
+                        if (auto itr = 
running_queries_on_all_fes.find(fe_process_uuid);
+                            itr != running_queries_on_all_fes.end()) {
+                            // Query not found on this frontend, and the query 
arrives before the last check
+                            if (itr->second.find(q_ctx->query_id()) == 
itr->second.end() &&
+                                // tv_nsec represents the number of 
nanoseconds that have elapsed since the time point stored in tv_sec.
+                                // tv_sec is enough, we do not need to check 
tv_nsec.
+                                q_ctx->get_query_arrival_timestamp().tv_sec <
+                                        
check_invalid_query_last_timestamp.tv_sec &&
+                                q_ctx->get_query_source() == 
QuerySource::INTERNAL_FRONTEND) {
+                                if (q_ctx->enable_pipeline_x_exec()) {
+                                    
queries_pipeline_task_leak.push_back(q_ctx->query_id());
+                                    LOG_INFO(
+                                            "Query {}, type {} is not found on 
any frontends, "
+                                            "maybe it "
+                                            "is leaked.",
+                                            print_id(q_ctx->query_id()),
+                                            
toString(q_ctx->get_query_source()));
+                                    continue;
+                                }
                             }
                         }
-                    }
 
-                    auto query_context = q.second;
+                        auto query_context = q.second;
 
-                    auto itr = running_fes.find(query_context->coord_addr);
-                    if (itr != running_fes.end()) {
-                        if (q.second->get_fe_process_uuid() == 
itr->second.info.process_uuid ||
-                            itr->second.info.process_uuid == 0) {
-                            continue;
-                        } else {
-                            LOG_WARNING("Coordinator of query {} restarted, 
going to cancel it.",
+                        auto itr = running_fes.find(query_context->coord_addr);
+                        if (itr != running_fes.end()) {
+                            if (q.second->get_fe_process_uuid() == 
itr->second.info.process_uuid ||
+                                itr->second.info.process_uuid == 0) {
+                                continue;
+                            } else {
+                                LOG_WARNING(
+                                        "Coordinator of query {} restarted, 
going to cancel it.",
                                         print_id(q.second->query_id()));
-                        }
-                    } else {
-                        // In some rear cases, the rpc port of follower is not 
updated in time,
-                        // then the port of this follower will be zero, but 
acutally it is still running,
-                        // and be has already received the query from follower.
-                        // So we need to check if host is in running_fes.
-                        bool fe_host_is_standing =
-                                std::any_of(running_fes.begin(), 
running_fes.end(),
-                                            [query_context](const auto& fe) {
-                                                return fe.first.hostname ==
-                                                               
query_context->coord_addr.hostname &&
-                                                       fe.first.port == 0;
-                                            });
-
-                        if (fe_host_is_standing) {
-                            LOG_WARNING(
-                                    "Coordinator {}:{} is not found, but its 
host is still "
-                                    "running with an unstable rpc port, not 
going to cancel "
-                                    "it.",
-                                    query_context->coord_addr.hostname,
-                                    query_context->coord_addr.port,
-                                    print_id(query_context->query_id()));
-                            continue;
+                            }
                         } else {
-                            LOG_WARNING(
-                                    "Could not find target coordinator {}:{} 
of query {}, "
-                                    "going to "
-                                    "cancel it.",
-                                    query_context->coord_addr.hostname,
-                                    query_context->coord_addr.port,
-                                    print_id(query_context->query_id()));
+                            // In some rear cases, the rpc port of follower is 
not updated in time,
+                            // then the port of this follower will be zero, 
but acutally it is still running,
+                            // and be has already received the query from 
follower.
+                            // So we need to check if host is in running_fes.
+                            bool fe_host_is_standing = std::any_of(
+                                    running_fes.begin(), running_fes.end(),
+                                    [query_context](const auto& fe) {
+                                        return fe.first.hostname ==
+                                                       
query_context->coord_addr.hostname &&
+                                               fe.first.port == 0;
+                                    });
+
+                            if (fe_host_is_standing) {
+                                LOG_WARNING(
+                                        "Coordinator {}:{} is not found, but 
its host is still "
+                                        "running with an unstable rpc port, 
not going to cancel "
+                                        "it.",
+                                        query_context->coord_addr.hostname,
+                                        query_context->coord_addr.port,
+                                        print_id(query_context->query_id()));
+                                continue;
+                            } else {
+                                LOG_WARNING(
+                                        "Could not find target coordinator 
{}:{} of query {}, "
+                                        "going to "
+                                        "cancel it.",
+                                        query_context->coord_addr.hostname,
+                                        query_context->coord_addr.port,
+                                        print_id(query_context->query_id()));
+                            }
                         }
-                    }
 
-                    // Coorninator of this query has already dead.
-                    queries_to_cancel.push_back(q.first);
-                }
+                        // Coorninator of this query has already dead.
+                        queries_to_cancel.push_back(q.first);
+                    }
+                    return Status::OK();
+                });
             }
         }
 
@@ -1493,15 +1547,18 @@ void FragmentMgr::cancel_worker() {
 
 void FragmentMgr::debug(std::stringstream& ss) {
     // Keep things simple
-    std::lock_guard<std::mutex> lock(_lock);
-
-    ss << "FragmentMgr have " << _fragment_instance_map.size() << " jobs.\n";
+    ss << "FragmentMgr have " << _fragment_instance_map.num_items() << " 
jobs.\n";
     ss << "job_id\t\tstart_time\t\texecute_time(s)\n";
     VecDateTimeValue now = VecDateTimeValue::local_time();
-    for (auto& it : _fragment_instance_map) {
-        ss << it.first << "\t" << it.second->start_time().debug_string() << 
"\t"
-           << now.second_diff(it.second->start_time()) << "\n";
-    }
+    _fragment_instance_map.apply(
+            [&](phmap::flat_hash_map<TUniqueId, 
std::shared_ptr<PlanFragmentExecutor>>& map)
+                    -> Status {
+                for (auto& it : map) {
+                    ss << it.first << "\t" << 
it.second->start_time().debug_string() << "\t"
+                       << now.second_diff(it.second->start_time()) << "\n";
+                }
+                return Status::OK();
+            });
 }
 
 void FragmentMgr::_check_brpc_available(const 
std::shared_ptr<PBackendService_Stub>& brpc_stub,
@@ -1684,26 +1741,22 @@ Status FragmentMgr::apply_filter(const 
PPublishFilterRequest* request,
 
     RuntimeFilterMgr* runtime_filter_mgr = nullptr;
     if (is_pipeline) {
-        std::unique_lock<std::mutex> lock(_lock);
-        auto iter = _pipeline_map.find(tfragment_instance_id);
-        if (iter == _pipeline_map.end()) {
+        pip_context = _pipeline_map.find(tfragment_instance_id);
+        if (pip_context == nullptr) {
             VLOG_CRITICAL << "unknown.... fragment-id:" << 
fragment_instance_id;
             return Status::InvalidArgument("fragment-id: {}", 
fragment_instance_id.to_string());
         }
-        pip_context = iter->second;
 
         DCHECK(pip_context != nullptr);
         runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
         query_thread_context = {pip_context->get_query_ctx()->query_id(),
                                 
pip_context->get_query_ctx()->query_mem_tracker};
     } else {
-        std::unique_lock<std::mutex> lock(_lock);
-        auto iter = _fragment_instance_map.find(tfragment_instance_id);
-        if (iter == _fragment_instance_map.end()) {
+        fragment_executor = _fragment_instance_map.find(tfragment_instance_id);
+        if (fragment_executor == nullptr) {
             VLOG_CRITICAL << "unknown.... fragment instance id:" << 
print_id(tfragment_instance_id);
             return Status::InvalidArgument("fragment-id: {}", 
print_id(tfragment_instance_id));
         }
-        fragment_executor = iter->second;
 
         DCHECK(fragment_executor != nullptr);
         runtime_filter_mgr =
@@ -1730,16 +1783,14 @@ Status FragmentMgr::apply_filterv2(const 
PPublishFilterRequestV2* request,
 
     const auto& fragment_instance_ids = request->fragment_instance_ids();
     {
-        std::unique_lock<std::mutex> lock(_lock);
         for (UniqueId fragment_instance_id : fragment_instance_ids) {
             TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
 
             if (is_pipeline) {
-                auto iter = _pipeline_map.find(tfragment_instance_id);
-                if (iter == _pipeline_map.end()) {
+                pip_context = _pipeline_map.find(tfragment_instance_id);
+                if (pip_context == nullptr) {
                     continue;
                 }
-                pip_context = iter->second;
 
                 DCHECK(pip_context != nullptr);
                 runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
@@ -1748,11 +1799,10 @@ Status FragmentMgr::apply_filterv2(const 
PPublishFilterRequestV2* request,
                                         
pip_context->get_query_ctx()->query_mem_tracker,
                                         
pip_context->get_query_ctx()->workload_group()};
             } else {
-                auto iter = _fragment_instance_map.find(tfragment_instance_id);
-                if (iter == _fragment_instance_map.end()) {
+                fragment_executor = 
_fragment_instance_map.find(tfragment_instance_id);
+                if (fragment_executor == nullptr) {
                     continue;
                 }
-                fragment_executor = iter->second;
 
                 DCHECK(fragment_executor != nullptr);
                 runtime_filter_mgr = 
fragment_executor->get_query_ctx()->runtime_filter_mgr();
@@ -1796,14 +1846,11 @@ Status FragmentMgr::send_filter_size(const 
PSendFilterSizeRequest* request) {
         TUniqueId query_id;
         query_id.__set_hi(queryid.hi);
         query_id.__set_lo(queryid.lo);
-        std::shared_lock lock(_query_ctx_map_lock);
-        auto iter = _query_ctx_map.find(query_id);
-        if (iter == _query_ctx_map.end()) {
+        query_ctx = _query_ctx_map.find(query_id);
+        if (query_ctx == nullptr) {
             return Status::EndOfFile("Query context (query-id: {}) not found, 
maybe finished",
                                      queryid.to_string());
         }
-
-        query_ctx = iter->second;
     }
 
     std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
@@ -1819,13 +1866,10 @@ Status FragmentMgr::sync_filter_size(const 
PSyncFilterSizeRequest* request) {
         TUniqueId query_id;
         query_id.__set_hi(queryid.hi);
         query_id.__set_lo(queryid.lo);
-        std::shared_lock lock(_query_ctx_map_lock);
-        auto iter = _query_ctx_map.find(query_id);
-        if (iter == _query_ctx_map.end()) {
+        query_ctx = _query_ctx_map.find(query_id);
+        if (query_ctx == nullptr) {
             return Status::InvalidArgument("query-id: {}", 
queryid.to_string());
         }
-
-        query_ctx = iter->second;
     }
     return query_ctx->runtime_filter_mgr()->sync_filter_size(request);
 }
@@ -1842,15 +1886,10 @@ Status FragmentMgr::merge_filter(const 
PMergeFilterRequest* request,
         TUniqueId query_id;
         query_id.__set_hi(queryid.hi);
         query_id.__set_lo(queryid.lo);
-        std::shared_lock lock(_query_ctx_map_lock);
-        auto iter = _query_ctx_map.find(query_id);
-        if (iter == _query_ctx_map.end()) {
+        query_ctx = _query_ctx_map.find(query_id);
+        if (query_ctx == nullptr) {
             return Status::InvalidArgument("query-id: {}", 
queryid.to_string());
         }
-
-        // hold reference to pip_context, or else runtime_state can be 
destroyed
-        // when filter_controller->merge is still in progress
-        query_ctx = iter->second;
     }
     SCOPED_ATTACH_TASK(query_ctx.get());
     auto merge_status = filter_controller->merge(request, attach_data, 
opt_remote_rf);
@@ -1936,17 +1975,19 @@ void 
FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFrag
 }
 
 void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* 
query_info_list) {
-    {
-        std::shared_lock lock(_query_ctx_map_lock);
-        for (const auto& q : _query_ctx_map) {
-            WorkloadQueryInfo workload_query_info;
-            workload_query_info.query_id = print_id(q.first);
-            workload_query_info.tquery_id = q.first;
-            workload_query_info.wg_id =
-                    q.second->workload_group() == nullptr ? -1 : 
q.second->workload_group()->id();
-            query_info_list->push_back(workload_query_info);
-        }
-    }
+    _query_ctx_map.apply(
+            [&](phmap::flat_hash_map<TUniqueId, 
std::shared_ptr<QueryContext>>& map) -> Status {
+                for (const auto& q : map) {
+                    WorkloadQueryInfo workload_query_info;
+                    workload_query_info.query_id = print_id(q.first);
+                    workload_query_info.tquery_id = q.first;
+                    workload_query_info.wg_id = q.second->workload_group() == 
nullptr
+                                                        ? -1
+                                                        : 
q.second->workload_group()->id();
+                    query_info_list->push_back(workload_query_info);
+                }
+                return Status::OK();
+            });
 }
 
 } // namespace doris
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 53cea30686f..1ceedb5337d 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -68,6 +68,46 @@ class WorkloadQueryInfo;
 
 std::string to_load_error_http_path(const std::string& file_name);
 
+template <typename Key, typename Value, typename ValueType>
+class ConcurrentContextMap {
+public:
+    using ApplyFunction = std::function<Status(phmap::flat_hash_map<Key, 
Value>&)>;
+    ConcurrentContextMap();
+    Value find(const Key& query_id);
+    void insert(const Key& query_id, std::shared_ptr<ValueType>);
+    void clear();
+    void erase(const Key& query_id);
+    size_t num_items() const {
+        size_t n = 0;
+        for (auto& pair : _internal_map) {
+            std::shared_lock lock(*pair.first);
+            auto& map = pair.second;
+            n += map.size();
+        }
+        return n;
+    }
+    void apply(ApplyFunction&& function) {
+        for (auto& pair : _internal_map) {
+            // TODO: Now only the cancel worker do the GC the _query_ctx_map. 
each query must
+            // do erase the finish query unless in _query_ctx_map. Rethink the 
logic is ok
+            std::unique_lock lock(*pair.first);
+            static_cast<void>(function(pair.second));
+        }
+    }
+
+    Status apply_if_not_exists(const Key& query_id, 
std::shared_ptr<ValueType>& query_ctx,
+                               ApplyFunction&& function);
+
+private:
+    // The lock should only be used to protect the structures in fragment 
manager. Has to be
+    // used in a very small scope because it may dead lock. For example, if 
the _lock is used
+    // in prepare stage, the call path is  prepare --> expr prepare --> may 
call allocator
+    // when allocate failed, allocator may call query_is_cancelled, query is 
callced will also
+    // call _lock, so that there is dead lock.
+    std::vector<std::pair<std::unique_ptr<std::shared_mutex>, 
phmap::flat_hash_map<Key, Value>>>
+            _internal_map;
+};
+
 // This class used to manage all the fragment execute in this instance
 class FragmentMgr : public RestMonitorIface {
 public:
@@ -142,10 +182,7 @@ public:
 
     std::shared_ptr<QueryContext> get_query_context(const TUniqueId& query_id);
 
-    int32_t running_query_num() {
-        std::shared_lock ctx_lock(_query_ctx_map_lock);
-        return _query_ctx_map.size();
-    }
+    int32_t running_query_num() { return _query_ctx_map.num_items(); }
 
     std::string dump_pipeline_tasks(int64_t duration = 0);
     std::string dump_pipeline_tasks(TUniqueId& query_id);
@@ -189,21 +226,17 @@ private:
     // This is input params
     ExecEnv* _exec_env = nullptr;
 
-    // The lock should only be used to protect the structures in fragment 
manager. Has to be
-    // used in a very small scope because it may dead lock. For example, if 
the _lock is used
-    // in prepare stage, the call path is  prepare --> expr prepare --> may 
call allocator
-    // when allocate failed, allocator may call query_is_cancelled, query is 
callced will also
-    // call _lock, so that there is dead lock.
-    std::mutex _lock;
-
     // Make sure that remove this before no data reference PlanFragmentExecutor
-    std::unordered_map<TUniqueId, std::shared_ptr<PlanFragmentExecutor>> 
_fragment_instance_map;
-
-    std::unordered_map<TUniqueId, 
std::shared_ptr<pipeline::PipelineFragmentContext>> _pipeline_map;
+    // (QueryID, FragmentID) -> PipelineFragmentContext
+    ConcurrentContextMap<TUniqueId, std::shared_ptr<PlanFragmentExecutor>, 
PlanFragmentExecutor>
+            _fragment_instance_map;
+    // (QueryID, FragmentID) -> PipelineFragmentContext
+    ConcurrentContextMap<TUniqueId, 
std::shared_ptr<pipeline::PipelineFragmentContext>,
+                         pipeline::PipelineFragmentContext>
+            _pipeline_map;
 
-    std::shared_mutex _query_ctx_map_lock;
     // query id -> QueryContext
-    phmap::flat_hash_map<TUniqueId, std::shared_ptr<QueryContext>> 
_query_ctx_map;
+    ConcurrentContextMap<TUniqueId, std::shared_ptr<QueryContext>, 
QueryContext> _query_ctx_map;
     std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>> 
_bf_size_map;
 
     CountDownLatch _stop_background_threads_latch;
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index dcff789043a..66623a46fbc 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -325,7 +325,13 @@ public:
         return _using_brpc_stubs;
     }
 
+    void push_instance_ids(const TUniqueId& ins_id) {
+        std::lock_guard<std::mutex> lock(_ins_lock);
+        fragment_instance_ids.push_back(ins_id);
+    }
+
 private:
+    std::mutex _ins_lock;
     TUniqueId _query_id;
     ExecEnv* _exec_env = nullptr;
     VecDateTimeValue _start_time;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to