This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 441b450a79 (runtimefilter) shorter time prepare consumes (#13127) 441b450a79 is described below commit 441b450a79e1a65beb18a7ad57ad62b27911ce3c Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com> AuthorDate: Thu Oct 6 10:12:29 2022 +0800 (runtimefilter) shorter time prepare consumes (#13127) Now, every preare put a runtime filter controller, so it takes the mutex lock on the controller map. Init of bloom filter takes some time in allocate and memset. If we run p1 tests with -parallel=20 -suiteParallel=20 -actionParallel=20, then we get error message like 'send fragment timeout 5s'. The patch fixes the problem in the following 2 ways: 1. Replace one mutex block with 128s. 2. If a plan fragment does not have a runtime filter, it does not need to take the locks. --- be/src/runtime/runtime_filter_mgr.cpp | 38 ++++++++++++++++++++--------------- be/src/runtime/runtime_filter_mgr.h | 15 ++++++++++---- 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 34c9fb33e3..73a9115a23 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -280,49 +280,55 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ Status RuntimeFilterMergeController::add_entity( const TExecPlanFragmentParams& params, std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) { + if (!params.params.__isset.runtime_filter_params || + params.params.runtime_filter_params.rid_to_runtime_filter.size() == 0) { + return Status::OK(); + } + runtime_filter_merge_entity_closer entity_closer = std::bind(runtime_filter_merge_entity_close, this, std::placeholders::_1); - std::lock_guard<std::mutex> guard(_controller_mutex); UniqueId query_id(params.params.query_id); std::string query_id_str = query_id.to_string(); - auto iter = _filter_controller_map.find(query_id_str); UniqueId fragment_instance_id = UniqueId(params.params.fragment_instance_id); + uint32_t shard = _get_controller_shard_idx(query_id); + std::lock_guard<std::mutex> guard(_controller_mutex[shard]); + auto iter = _filter_controller_map[shard].find(query_id_str); - if (iter == _filter_controller_map.end()) { + if (iter == _filter_controller_map[shard].end()) { *handle = std::shared_ptr<RuntimeFilterMergeControllerEntity>( new RuntimeFilterMergeControllerEntity(), entity_closer); - _filter_controller_map[query_id_str] = *handle; + _filter_controller_map[shard][query_id_str] = *handle; const TRuntimeFilterParams& filter_params = params.params.runtime_filter_params; - if (params.params.__isset.runtime_filter_params) { - RETURN_IF_ERROR(handle->get()->init(query_id, fragment_instance_id, filter_params, - params.query_options)); - } + RETURN_IF_ERROR(handle->get()->init(query_id, fragment_instance_id, filter_params, + params.query_options)); } else { - *handle = _filter_controller_map[query_id_str].lock(); + *handle = _filter_controller_map[shard][query_id_str].lock(); } return Status::OK(); } Status RuntimeFilterMergeController::acquire( UniqueId query_id, std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) { - std::lock_guard<std::mutex> guard(_controller_mutex); + uint32_t shard = _get_controller_shard_idx(query_id); + std::lock_guard<std::mutex> guard(_controller_mutex[shard]); std::string query_id_str = query_id.to_string(); - auto iter = _filter_controller_map.find(query_id_str); - if (iter == _filter_controller_map.end()) { + auto iter = _filter_controller_map[shard].find(query_id_str); + if (iter == _filter_controller_map[shard].end()) { LOG(WARNING) << "not found entity, query-id:" << query_id_str; return Status::InvalidArgument("not found entity"); } - *handle = _filter_controller_map[query_id_str].lock(); + *handle = _filter_controller_map[shard][query_id_str].lock(); if (*handle == nullptr) { return Status::InvalidArgument("entity is closed"); } return Status::OK(); } -Status RuntimeFilterMergeController::remove_entity(UniqueId queryId) { - std::lock_guard<std::mutex> guard(_controller_mutex); - _filter_controller_map.erase(queryId.to_string()); +Status RuntimeFilterMergeController::remove_entity(UniqueId query_id) { + uint32_t shard = _get_controller_shard_idx(query_id); + std::lock_guard<std::mutex> guard(_controller_mutex[shard]); + _filter_controller_map[shard].erase(query_id.to_string()); return Status::OK(); } diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 1471f664f7..1dbb75471e 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -22,6 +22,7 @@ #include <memory> #include <mutex> #include <thread> +#include <unordered_map> #include "common/object_pool.h" #include "common/status.h" @@ -162,16 +163,22 @@ public: // thread safe // remove a entity by query-id // remove_entity will be called automatically by entity when entity is destroyed - Status remove_entity(UniqueId queryId); + Status remove_entity(UniqueId query_id); + + static const int kShardNum = 128; private: - std::mutex _controller_mutex; + uint32_t _get_controller_shard_idx(UniqueId& query_id) { + return (uint32_t)query_id.hi % kShardNum; + } + + std::mutex _controller_mutex[kShardNum]; // We store the weak pointer here. // When the external object is destroyed, we need to clear this record using FilterControllerMap = - std::map<std::string, std::weak_ptr<RuntimeFilterMergeControllerEntity>>; + std::unordered_map<std::string, std::weak_ptr<RuntimeFilterMergeControllerEntity>>; // str(query-id) -> entity - FilterControllerMap _filter_controller_map; + FilterControllerMap _filter_controller_map[kShardNum]; }; using runtime_filter_merge_entity_closer = std::function<void(RuntimeFilterMergeControllerEntity*)>; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org