This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.1-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 65247d03b081f1b2618efab18c6b262fc28850fe Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com> AuthorDate: Thu Oct 6 10:12:29 2022 +0800 (runtimefilter) shorter time prepare consumes (#13127) Now, every preare put a runtime filter controller, so it takes the mutex lock on the controller map. Init of bloom filter takes some time in allocate and memset. If we run p1 tests with -parallel=20 -suiteParallel=20 -actionParallel=20, then we get error message like 'send fragment timeout 5s'. The patch fixes the problem in the following 2 ways: 1. Replace one mutex block with 128s. 2. If a plan fragment does not have a runtime filter, it does not need to take the locks. --- be/src/runtime/runtime_filter_mgr.cpp | 32 ++++++++++++++++++++------------ be/src/runtime/runtime_filter_mgr.h | 15 +++++++++++---- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index b5302aeace..ffea62a155 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -272,48 +272,56 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ Status RuntimeFilterMergeController::add_entity( const TExecPlanFragmentParams& params, std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) { + if (!params.params.__isset.runtime_filter_params || + params.params.runtime_filter_params.rid_to_runtime_filter.size() == 0) { + return Status::OK(); + } + runtime_filter_merge_entity_closer entity_closer = std::bind(runtime_filter_merge_entity_close, this, std::placeholders::_1); - std::lock_guard<std::mutex> guard(_controller_mutex); UniqueId query_id(params.params.query_id); std::string query_id_str = query_id.to_string(); - auto iter = _filter_controller_map.find(query_id_str); UniqueId fragment_instance_id = UniqueId(params.params.fragment_instance_id); + uint32_t shard = _get_controller_shard_idx(query_id); + std::lock_guard<std::mutex> guard(_controller_mutex[shard]); + auto iter = _filter_controller_map[shard].find(query_id_str); - if (iter == _filter_controller_map.end()) { + if (iter == _filter_controller_map[shard].end()) { *handle = std::shared_ptr<RuntimeFilterMergeControllerEntity>( new RuntimeFilterMergeControllerEntity(), entity_closer); - _filter_controller_map[query_id_str] = *handle; + _filter_controller_map[shard][query_id_str] = *handle; const TRuntimeFilterParams& filter_params = params.params.runtime_filter_params; if (params.params.__isset.runtime_filter_params) { RETURN_IF_ERROR(handle->get()->init(query_id, fragment_instance_id, filter_params, params.query_options)); } } else { - *handle = _filter_controller_map[query_id_str].lock(); + *handle = _filter_controller_map[shard][query_id_str].lock(); } return Status::OK(); } Status RuntimeFilterMergeController::acquire( UniqueId query_id, std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) { - std::lock_guard<std::mutex> guard(_controller_mutex); + uint32_t shard = _get_controller_shard_idx(query_id); + std::lock_guard<std::mutex> guard(_controller_mutex[shard]); std::string query_id_str = query_id.to_string(); - auto iter = _filter_controller_map.find(query_id_str); - if (iter == _filter_controller_map.end()) { + auto iter = _filter_controller_map[shard].find(query_id_str); + if (iter == _filter_controller_map[shard].end()) { LOG(WARNING) << "not found entity, query-id:" << query_id_str; return Status::InvalidArgument("not found entity"); } - *handle = _filter_controller_map[query_id_str].lock(); + *handle = _filter_controller_map[shard][query_id_str].lock(); if (*handle == nullptr) { return Status::InvalidArgument("entity is closed"); } return Status::OK(); } -Status RuntimeFilterMergeController::remove_entity(UniqueId queryId) { - std::lock_guard<std::mutex> guard(_controller_mutex); - _filter_controller_map.erase(queryId.to_string()); +Status RuntimeFilterMergeController::remove_entity(UniqueId query_id) { + uint32_t shard = _get_controller_shard_idx(query_id); + std::lock_guard<std::mutex> guard(_controller_mutex[shard]); + _filter_controller_map[shard].erase(query_id.to_string()); return Status::OK(); } diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 653ce675b2..346a3cbab8 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -22,6 +22,7 @@ #include <memory> #include <mutex> #include <thread> +#include <unordered_map> #include "common/object_pool.h" #include "common/status.h" @@ -163,16 +164,22 @@ public: // thread safe // remove a entity by query-id // remove_entity will be called automatically by entity when entity is destroyed - Status remove_entity(UniqueId queryId); + Status remove_entity(UniqueId query_id); + + static const int kShardNum = 128; private: - std::mutex _controller_mutex; + uint32_t _get_controller_shard_idx(UniqueId& query_id) { + return (uint32_t)query_id.hi % kShardNum; + } + + std::mutex _controller_mutex[kShardNum]; // We store the weak pointer here. // When the external object is destroyed, we need to clear this record using FilterControllerMap = - std::map<std::string, std::weak_ptr<RuntimeFilterMergeControllerEntity>>; + std::unordered_map<std::string, std::weak_ptr<RuntimeFilterMergeControllerEntity>>; // str(query-id) -> entity - FilterControllerMap _filter_controller_map; + FilterControllerMap _filter_controller_map[kShardNum]; }; using runtime_filter_merge_entity_closer = std::function<void(RuntimeFilterMergeControllerEntity*)>; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org