This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 9272c650b4d [Refactor](query) refactor lock in fragment mgr and change std::unorder_map to phmap (#45069) 9272c650b4d is described below commit 9272c650b4dbb4781d010f4f68afe721a42fbc0d Author: HappenLee <happen...@selectdb.com> AuthorDate: Thu Dec 19 22:27:33 2024 +0800 [Refactor](query) refactor lock in fragment mgr and change std::unorder_map to phmap (#45069) ### What problem does this PR solve? Related PR: #44821 --- be/src/runtime/fragment_mgr.cpp | 71 +++++++++++++++++++++++++++-------------- be/src/runtime/fragment_mgr.h | 5 +-- 2 files changed, 50 insertions(+), 26 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index f43190ebb36..b7bbaf8f206 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -271,12 +271,17 @@ void FragmentMgr::stop() { { std::lock_guard<std::mutex> lock(_lock); _fragment_instance_map.clear(); - _query_ctx_map.clear(); for (auto& pipeline : _pipeline_map) { pipeline.second->close_sink(); } _pipeline_map.clear(); } + + { + std::unique_lock lock(_query_ctx_map_lock); + _query_ctx_map.clear(); + } + _async_report_thread_pool->shutdown(); } @@ -620,11 +625,11 @@ void FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex _fragment_instance_map.erase(fragment_executor->fragment_instance_id()); LOG_INFO("Instance {} finished", print_id(fragment_executor->fragment_instance_id())); - - if (all_done && query_ctx) { - _query_ctx_map.erase(query_ctx->query_id()); - LOG_INFO("Query {} finished", print_id(query_ctx->query_id())); - } + } + if (all_done && query_ctx) { + std::unique_lock lock(_query_ctx_map_lock); + _query_ctx_map.erase(query_ctx->query_id()); + LOG_INFO("Query {} finished", print_id(query_ctx->query_id())); } // Callback after remove from this id @@ -713,8 +718,10 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r query_id.__set_lo(request->query_id().lo()); std::shared_ptr<QueryContext> q_ctx = nullptr; { - std::lock_guard<std::mutex> lock(_lock); - + TUniqueId query_id; + query_id.__set_hi(request->query_id().hi()); + query_id.__set_lo(request->query_id().lo()); + std::shared_lock lock(_query_ctx_map_lock); auto search = _query_ctx_map.find(query_id); if (search == _query_ctx_map.end()) { return Status::InternalError( @@ -732,22 +739,24 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r void FragmentMgr::remove_pipeline_context( std::shared_ptr<pipeline::PipelineFragmentContext> f_context) { auto* q_context = f_context->get_query_ctx(); + bool all_done = false; + TUniqueId query_id = f_context->get_query_id(); { std::lock_guard<std::mutex> lock(_lock); - auto query_id = f_context->get_query_id(); std::vector<TUniqueId> ins_ids; f_context->instance_ids(ins_ids); - bool all_done = q_context->countdown(ins_ids.size()); + all_done = q_context->countdown(ins_ids.size()); for (const auto& ins_id : ins_ids) { LOG_INFO("Removing query {} instance {}, all done? {}", print_id(query_id), print_id(ins_id), all_done); _pipeline_map.erase(ins_id); g_pipeline_fragment_instances_count << -1; } - if (all_done) { - LOG_INFO("Query {} finished", print_id(query_id)); - _query_ctx_map.erase(query_id); - } + } + if (all_done) { + std::unique_lock lock(_query_ctx_map_lock); + _query_ctx_map.erase(query_id); + LOG_INFO("Query {} finished", print_id(query_id)); } } @@ -759,7 +768,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo { return Status::InternalError("FragmentMgr._get_query_ctx.failed"); }); if (params.is_simplified_param) { // Get common components from _query_ctx_map - std::lock_guard<std::mutex> lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto search = _query_ctx_map.find(query_id); if (search == _query_ctx_map.end()) { return Status::InternalError( @@ -771,7 +780,16 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo } else { // Find _query_ctx_map, in case some other request has already // create the query fragments context. - std::lock_guard<std::mutex> lock(_lock); + { + std::shared_lock lock(_query_ctx_map_lock); + auto search = _query_ctx_map.find(query_id); + if (search != _query_ctx_map.end()) { + query_ctx = search->second; + return Status::OK(); + } + } + + std::unique_lock lock(_query_ctx_map_lock); auto search = _query_ctx_map.find(query_id); if (search != _query_ctx_map.end()) { query_ctx = search->second; @@ -1170,7 +1188,7 @@ void FragmentMgr::_set_scan_concurrency(const Param& params, QueryContext* query } std::shared_ptr<QueryContext> FragmentMgr::get_query_context(const TUniqueId& query_id) { - std::lock_guard<std::mutex> state_lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto ctx = _query_ctx_map.find(query_id); if (ctx != _query_ctx_map.end()) { return ctx->second; @@ -1184,7 +1202,7 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCan std::shared_ptr<QueryContext> query_ctx; std::vector<TUniqueId> all_instance_ids; { - std::lock_guard<std::mutex> state_lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto ctx_iter = _query_ctx_map.find(query_id); if (ctx_iter == _query_ctx_map.end()) { @@ -1251,7 +1269,7 @@ void FragmentMgr::cancel_instance(const TUniqueId& instance_id, void FragmentMgr::cancel_fragment(const TUniqueId& query_id, int32_t fragment_id, const PPlanFragmentCancelReason& reason, const std::string& msg) { - std::unique_lock<std::mutex> lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto q_ctx_iter = _query_ctx_map.find(query_id); if (q_ctx_iter != _query_ctx_map.end()) { // Has to use value to keep the shared ptr not deconstructed. @@ -1315,6 +1333,9 @@ void FragmentMgr::cancel_worker() { pipeline_itr.second->clear_finished_tasks(); } } + } + { + std::unique_lock lock(_query_ctx_map_lock); for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) { if (it->second->is_timeout(now)) { LOG_WARNING("Query {} is timeout", print_id(it->first)); @@ -1335,7 +1356,9 @@ void FragmentMgr::cancel_worker() { ++it; } } - + } + { + std::shared_lock lock(_query_ctx_map_lock); // We use a very conservative cancel strategy. // 0. If there are no running frontends, do not cancel any queries. // 1. If query's process uuid is zero, do not cancel @@ -1773,7 +1796,7 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) { TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::lock_guard<std::mutex> lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto iter = _query_ctx_map.find(query_id); if (iter == _query_ctx_map.end()) { return Status::EndOfFile("Query context (query-id: {}) not found, maybe finished", @@ -1796,7 +1819,7 @@ Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) { TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::lock_guard<std::mutex> lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto iter = _query_ctx_map.find(query_id); if (iter == _query_ctx_map.end()) { return Status::InvalidArgument("query-id: {}", queryid.to_string()); @@ -1819,7 +1842,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::lock_guard<std::mutex> lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto iter = _query_ctx_map.find(query_id); if (iter == _query_ctx_map.end()) { return Status::InvalidArgument("query-id: {}", queryid.to_string()); @@ -1914,7 +1937,7 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFrag void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_info_list) { { - std::lock_guard<std::mutex> lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); for (const auto& q : _query_ctx_map) { WorkloadQueryInfo workload_query_info; workload_query_info.query_id = print_id(q.first); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 0c1bb3033d9..53cea30686f 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -143,7 +143,7 @@ public: std::shared_ptr<QueryContext> get_query_context(const TUniqueId& query_id); int32_t running_query_num() { - std::unique_lock<std::mutex> ctx_lock(_lock); + std::shared_lock ctx_lock(_query_ctx_map_lock); return _query_ctx_map.size(); } @@ -201,8 +201,9 @@ private: std::unordered_map<TUniqueId, std::shared_ptr<pipeline::PipelineFragmentContext>> _pipeline_map; + std::shared_mutex _query_ctx_map_lock; // query id -> QueryContext - std::unordered_map<TUniqueId, std::shared_ptr<QueryContext>> _query_ctx_map; + phmap::flat_hash_map<TUniqueId, std::shared_ptr<QueryContext>> _query_ctx_map; std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>> _bf_size_map; CountDownLatch _stop_background_threads_latch; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org