This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 925da90480 [imporve](bloomfilter) refactor runtime_filter_mgr with bloomfilter (#21715) 925da90480 is described below commit 925da90480f60afc0e5333a536d41e004234874e Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Tue Jul 11 22:35:30 2023 +0800 [imporve](bloomfilter) refactor runtime_filter_mgr with bloomfilter (#21715) Reduced the granularity of the lock. In the past, the entire map was locked map(string) --> map(int) The bf does not need to init_with_fixed_length --- be/src/runtime/runtime_filter_mgr.cpp | 38 +++++++++++++++++++---------------- be/src/runtime/runtime_filter_mgr.h | 13 ++++++------ 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 4380716671..6614532d85 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -39,6 +39,7 @@ #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "util/brpc_client_cache.h" +#include "util/spinlock.h" namespace doris { @@ -208,7 +209,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* query_options, const std::vector<doris::TRuntimeFilterTargetParams>* target_info, const int producer_size) { - std::lock_guard<std::mutex> guard(_filter_map_mutex); + std::unique_lock<std::shared_mutex> guard(_filter_map_mutex); std::shared_ptr<RuntimeFilterCntlVal> cntVal = std::make_shared<RuntimeFilterCntlVal>(); // runtime_filter_desc and target will be released, // so we need to copy to cntVal @@ -219,10 +220,10 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( cntVal->filter = cntVal->pool->add(new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool)); - std::string filter_id = std::to_string(runtime_filter_desc->filter_id); + auto filter_id = runtime_filter_desc->filter_id; // LOG(INFO) << "entity filter id:" << filter_id; cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options, -1, false); - _filter_map.emplace(filter_id, cntVal); + _filter_map.emplace(filter_id, CntlValwithLock {cntVal, std::make_unique<SpinLock>()}); return Status::OK(); } @@ -230,7 +231,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* query_options, const std::vector<doris::TRuntimeFilterTargetParamsV2>* targetv2_info, const int producer_size) { - std::lock_guard<std::mutex> guard(_filter_map_mutex); + std::unique_lock<std::shared_mutex> guard(_filter_map_mutex); std::shared_ptr<RuntimeFilterCntlVal> cntVal = std::make_shared<RuntimeFilterCntlVal>(); // runtime_filter_desc and target will be released, // so we need to copy to cntVal @@ -241,10 +242,10 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( cntVal->filter = cntVal->pool->add(new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool)); - std::string filter_id = std::to_string(runtime_filter_desc->filter_id); + auto filter_id = runtime_filter_desc->filter_id; // LOG(INFO) << "entity filter id:" << filter_id; cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options); - _filter_map.emplace(filter_id, cntVal); + _filter_map.emplace(filter_id, CntlValwithLock {cntVal, std::make_unique<SpinLock>()}); return Status::OK(); } @@ -312,34 +313,37 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ std::shared_ptr<RuntimeFilterCntlVal> cntVal; int merged_size = 0; int64_t merge_time = 0; + int64_t start_merge = MonotonicMillis(); + auto filter_id = request->filter_id(); + std::map<int, CntlValwithLock>::iterator iter; { - int64_t start_merge = MonotonicMillis(); - std::lock_guard<std::mutex> guard(_filter_map_mutex); - auto iter = _filter_map.find(std::to_string(request->filter_id())); + std::shared_lock<std::shared_mutex> guard(_filter_map_mutex); + iter = _filter_map.find(filter_id); VLOG_ROW << "recv filter id:" << request->filter_id() << " " << request->ShortDebugString(); if (iter == _filter_map.end()) { return Status::InvalidArgument("unknown filter id {}", std::to_string(request->filter_id())); } - cntVal = iter->second; - if (auto bf = cntVal->filter->get_bloomfilter()) { - RETURN_IF_ERROR(bf->init_with_fixed_length()); - } + } + // iter->second = pair{CntlVal,SpinLock} + cntVal = iter->second.first; + { + std::lock_guard<SpinLock> l(*iter->second.second); MergeRuntimeFilterParams params(request, attach_data); - ObjectPool* pool = iter->second->pool.get(); + ObjectPool* pool = cntVal->pool.get(); RuntimeFilterWrapperHolder holder; RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, ¶ms, pool, holder.getHandle())); RETURN_IF_ERROR(cntVal->filter->merge_from(holder.getHandle()->get())); - cntVal->arrive_id.insert(UniqueId(request->fragment_id()).to_string()); + cntVal->arrive_id.insert(UniqueId(request->fragment_id())); merged_size = cntVal->arrive_id.size(); // TODO: avoid log when we had acquired a lock VLOG_ROW << "merge size:" << merged_size << ":" << cntVal->producer_size; DCHECK_LE(merged_size, cntVal->producer_size); - iter->second->merge_time += (MonotonicMillis() - start_merge); + cntVal->merge_time += (MonotonicMillis() - start_merge); if (merged_size < cntVal->producer_size) { return Status::OK(); } else { - merge_time = iter->second->merge_time; + merge_time = cntVal->merge_time; } } diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 7d7e7a0ba1..2048229cd2 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -26,6 +26,7 @@ #include <map> #include <memory> #include <mutex> +#include <shared_mutex> #include <string> #include <unordered_map> #include <unordered_set> @@ -145,12 +146,12 @@ public: std::vector<doris::TRuntimeFilterTargetParams> target_info; std::vector<doris::TRuntimeFilterTargetParamsV2> targetv2_info; IRuntimeFilter* filter; - std::unordered_set<std::string> arrive_id; // fragment_instance_id ? + std::unordered_set<UniqueId> arrive_id; // fragment_instance_id ? std::shared_ptr<ObjectPool> pool; }; public: - RuntimeFilterCntlVal* get_filter(int id) { return _filter_map[std::to_string(id)].get(); } + RuntimeFilterCntlVal* get_filter(int id) { return _filter_map[id].first.get(); } private: Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc, @@ -166,11 +167,11 @@ private: UniqueId _query_id; UniqueId _fragment_instance_id; // protect _filter_map - std::mutex _filter_map_mutex; + std::shared_mutex _filter_map_mutex; std::shared_ptr<MemTracker> _mem_tracker; - // TODO: convert filter id to i32 - // filter-id -> val - std::map<std::string, std::shared_ptr<RuntimeFilterCntlVal>> _filter_map; + using CntlValwithLock = + std::pair<std::shared_ptr<RuntimeFilterCntlVal>, std::unique_ptr<SpinLock>>; + std::map<int, CntlValwithLock> _filter_map; RuntimeState* _state; bool _opt_remote_rf = true; }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org