This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d430aec3ae [Bug](bloomfilter) fix concurrency bug caused by bloom filter (#13306) d430aec3ae is described below commit d430aec3aeded447823b155756282b258852777b Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Oct 13 09:10:02 2022 +0800 [Bug](bloomfilter) fix concurrency bug caused by bloom filter (#13306) --- be/src/runtime/fragment_mgr.cpp | 58 +++++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 0471b63d71..b18e80df3e 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -670,29 +670,32 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi _runtimefilter_controller.add_entity(params, &handler); exec_state->set_merge_controller_handler(handler); - auto& runtime_filter_params = params.params.runtime_filter_params; - if (!runtime_filter_params.rid_to_runtime_filter.empty()) { - _bf_size_map.insert({fragments_ctx->query_id, {}}); - } - for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) { - int filter_id = filterid_to_desc.first; - const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id); - if (target_iter == runtime_filter_params.rid_to_target_param.end()) { - continue; - } - const auto& build_iter = runtime_filter_params.runtime_filter_builder_num.find(filter_id); - if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) { - continue; - } - if (filterid_to_desc.second.__isset.bloom_filter_size_bytes) { - _bf_size_map[fragments_ctx->query_id].insert( - {filter_id, filterid_to_desc.second.bloom_filter_size_bytes}); - } - } - RETURN_IF_ERROR(exec_state->prepare(params)); { std::lock_guard<std::mutex> lock(_lock); + auto& runtime_filter_params = params.params.runtime_filter_params; + if (!runtime_filter_params.rid_to_runtime_filter.empty()) { + auto bf_size_for_cur_query = _bf_size_map.find(fragments_ctx->query_id); + if (bf_size_for_cur_query == _bf_size_map.end()) { + _bf_size_map.insert({fragments_ctx->query_id, {}}); + } + for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) { + int filter_id = filterid_to_desc.first; + const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id); + if (target_iter == runtime_filter_params.rid_to_target_param.end()) { + continue; + } + const auto& build_iter = + runtime_filter_params.runtime_filter_builder_num.find(filter_id); + if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) { + continue; + } + if (filterid_to_desc.second.__isset.bloom_filter_size_bytes) { + _bf_size_map[fragments_ctx->query_id].insert( + {filter_id, filterid_to_desc.second.bloom_filter_size_bytes}); + } + } + } _fragment_map.insert(std::make_pair(params.params.fragment_instance_id, exec_state)); _cv.notify_all(); } @@ -979,12 +982,15 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, const char* UniqueId queryid = request->query_id(); std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller; RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller)); - auto bf_size_for_cur_query = _bf_size_map.find(queryid.to_thrift()); - if (bf_size_for_cur_query != _bf_size_map.end()) { - for (auto& iter : bf_size_for_cur_query->second) { - auto bf = filter_controller->get_filter(iter.first)->filter->get_bloomfilter(); - DCHECK(bf != nullptr); - bf->init_with_fixed_length(iter.second); + { + std::lock_guard<std::mutex> lock(_lock); + auto bf_size_for_cur_query = _bf_size_map.find(queryid.to_thrift()); + if (bf_size_for_cur_query != _bf_size_map.end()) { + for (auto& iter : bf_size_for_cur_query->second) { + auto bf = filter_controller->get_filter(iter.first)->filter->get_bloomfilter(); + DCHECK(bf != nullptr); + bf->init_with_fixed_length(iter.second); + } } } RETURN_IF_ERROR(filter_controller->merge(request, attach_data)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org